Consumer cancellation has been a requested feature of EasyNetQ for a while now. I wasn’t intending to implement it immediately, but a pull request by Daniel White today made me look at the whole issue of consumer cancellation from the point of view of a deleted queue. It led rapidly to a quite straightforward implementation of user cancellations. The wonders of open source software development, and the generosity of people like Daniel, never fails to impress me.
So what is consumer cancellation? It means that you can stop consuming from a queue without having dispose of the entire IBus instance and close the connection. All the IBus Subscribe methods, and the IAdvancedBus Consume methods now return an IDisposable. To stop consuming, just call Dispose like this:
var cancelSubscription = bus.Subscribe<MyMessage>("subscriptionId", MessageHandler);
.
.
// sometime later stop consuming
cancelSubscription.Dispose();
9 comments:
Does the bus hold an internal list of all the subscriptions it maintains?
Right now, I have a service running which registers some subscriptions in the OnStart method. When I update, will I need to maintain a list of all my subscriptions to dispose of myself in the OnStop method, or will the framework automatically handle it for me when I call dispose on the bus?
Jason, yes the bus maintains a list of all the subscriptions that you make. When you dispose of the bus, all the subscriptions get disposed as well. Nothing's changed in that respect.
What about the scenario when the queue is deleted by the server? How can the client attempt to re-create the queue?
... i.e how to best handle the BasicCancel event received?
If the queue is deleted, EasyNetQ simply stops consuming from it. Attempting to re-create it is probably wrong because we assume that the queue was deleted for a reason.
I agree, it sounds odd but that seems to be the policy of CloudAMQP. When they suffer network outages such as DDOS attacks, our queue is deleted as opposed to the bus connection being dropped. Are you saying there is no extensibility in ENQ for BasicCancel at all?
Not presently. Basic.cancel immediately stops and disposes of any consumers on the queue. The only way for you to change that behavior would be to implement your own PersistentConsumer and IConsumerFactory, then register your IConsumerFactory with EasyNetQ.
Basic.cancel is an instruction from the broker to stop consuming, so doing anything else is effectively breaking AMQP. CloudAMQP shouldn't unilaterally delete durable queues. Declaring a queue as durable is telling the broker that you expect it to survive restarts and network connection losses.
According to this, a basic cancel event can be issued in a clustered scenario when the node on which the queue is located fails - in order to notify the client.
So it isn't that CloudAMQP are deleting the queue, its just that the same basic cancel event is raised when a node goes down. Guess this is something we'll need to accommodate...
OK, that's interesting. It might be worth doing some cluster fail-over tests to see what the actual behavior is.
Post a Comment