Wednesday, November 13, 2013

EasyNetQ: Multiple Handlers per Consumer

A common feature request for EasyNetQ has been to have some way of implementing a command pipeline pattern. Say you’ve got a component that is emitting commands. In a .NET application each command would most probably be implemented as a separate class. A command might look something like this:

public class AddUser
{
public string Username { get; private set; }
public string Email { get; private set; }

public AddUser(string username, string email)
{
Username = username;
Email = email;
}
}

Another component might listen for commands and act on them. Previously in EasyNetQ it would have been difficult to implement this pattern because a consumer (Subscriber) was always bound to a given message type. You would have had to use the lower level IAdvancedBus binary message methods and implement your own serialization and dispatch infrastructure.

But now EasyNetQ comes with multiple handlers per consumer out of the box.

From version 0.20 there’s a new overload of the Consume method that provides a fluent way for you to register multiple message type handlers to a single consumer, and thus a single queue.

Here’s an example:

bus = RabbitHutch.CreateBus("host=localhost");

var queue = bus.Advanced.QueueDeclare("multiple_types");

bus.Advanced.Consume(queue, x => x
.Add<AddUser>((message, info) =>
{
Console.WriteLine("Add User {0}", message.Body.Username);
})
.Add<DeleteUser>((message, info) =>
{
Console.WriteLine("Delete User {0}", message.Body.Username);
})
);

Now we can publish multiple message types to the same queue:

bus.Advanced.Publish(Exchange.GetDefault(), queue.Name, false, false, 
new Message<AddUser>(new AddUser("Steve Howe", "steve@worlds-best-guitarist.com"))));
bus.Advanced.Publish(Exchange.GetDefault(), queue.Name, false, false,
new Message<DeleteUser>(new DeleteUser("Steve Howe")));

By Default, if a matching handler cannot be found for a message, EasyNetQ will throw an exception. You can change this behaviour, and simply ignore messages that do not have a handler, by setting the ThrowOnNoMatchingHandler property to false, like this:

bus.Advanced.Consume(queue, x => x
.Add<AddUser>((message, info) =>
{
Console.WriteLine("Add User {0}", message.Body.Username);
})
.Add<DeleteUser>((message, info) =>
{
Console.WriteLine("Delete User {0}", message.Body.Username);
})
.ThrowOnNoMatchingHandler = false
);

Very soon there will be a send/receive pattern implemented at the IBus level to make this even easier. Watch this space!

Happy commanding!

12 comments:

BillE said...

Hi,

The following just broke.

For().Use();

what is the new way to hook into message validation at consumer?




BillE said...

Sorry that was

IMessageValidationStrategy

code in structure map. The interface disappeared.

Mike Hadlow said...

Hi Bill,

Yes, sorry about that, the interface has indeed disappeared. It's been replaced by IHandlerCollectionFactory that produces an IHandlerCollection. Have a look at the HandlerCollection class to see how that's implemented. It's basically looking at the Type field in BasicProperties to work out which handler it should use to handle the message.

Mike Hadlow said...

Why were you using a custom message validator?

BillE said...

I have a bridge between OpenMQ message coming in as Xml (with a known xsd contract ) in Java which routes to RabbitMQ. I am picking up the message with custom serializer that runs the MS XmlSerializer to make object model from the original msg. These are consumed in .NET via EasyNetQ

BillE said...

I implemented the validation strategy with an empty method call on CheckMessageType. This was letting the custom serializer run.

Maybe I could have provided more type information when the Java called basicpublish, but that seemed pretty brittle.

Didn't think the whole interface would up and go though. Now that's brittle.

bille

Mike Hadlow said...

Hi Bill,

Once again, sorry for the problems this has caused for you. EasyNetQ is still very much alpha software in that I'm still refining the API. Any 0..0.0 change up to V1.0 will have breaking changes in it and you should review those before upgrading.

In your case, I think you'd be far better served by using IAdvancedBus.Consume method that passes you the binary payload. You could do your own de-serialization outside of EasyNetQ with no worries about validation etc, but still have all the advantages of the thread dispatching, re-connection and error strategy. See the docs for details:

https://github.com/mikehadlow/EasyNetQ/wiki/The-Advanced-API

Feel free to comment here or get in touch with me directly if you've got any further questions.

BillE said...

Thanks for getting back so fast! I was just busting. The code is nice and your latest wave of improvements especially interface marshalling was really cool.

I have used the advanced api a bit in test prototyping in Linqpad. In the current code base, we are using ioc with IBus instance as a singleton. We made a custom conventions for our naming model and that was sufficient and worked well.

I will run this particular feed through its own connection using AdvancedBus. The only down side is having to do the Topology stuff manually, Maybe I can use the conventions class to provide the names.

steelden said...

IMO it would be more LINQ-way to make ThrowOnNoMatchingHandler a method like Add.
So we can use it like
bus.Advanced.Consume(queue, x => x.
.ThrowOnNoMatchingHandler(true)
.Add()
.Add()
.Add());

Keith said...

I can't figure out how to implement this and handle the message processing asynchronously - can you point me in the right direction, using the same example above?

Cheers!
Keith

Mike Hadlow said...

Hi Keith, try asking this question on the mailing list. It's a far better place for this kind of conversation: https://groups.google.com/forum/#!forum/easynetq

mvbaffa said...

Hi. Can iI instantiate more than one consumer in the same queue.
Thanks