Wednesday, July 22, 2009

A First Look at MassTransit

Get the code for this post here:

http://static.mikehadlow.com/MassTransit.Play.zip

I’ve recently been trying out MassTransit as a possible replacement for our current JBOWS architecture. MassTransit is a “lean service bus implementation for building loosely coupled applications using the .NET framework.” It’s a simple service bus based around the idea of asynchronous publish and subscribe. It’s written by Dru Sellers and Chris Patterson, both good guys who have been very quick to respond on both twitter and the MassTransit google group.

To start with I wanted to try out the simplest thing possible, a single publisher and a single subscriber. I wanted to be able to publish a message and have the subscriber pick it up.

The first thing to do is get the latest source from the MassTransit google code repository and build it.

The core Mass Transit infrastructure is provided by a service called MassTransit.RuntimeServices.exe this is a windows service built on Top Shelf (be careful what you click on at work when Googling for this J). I plan to blog about Top Shelf in the future, but in short it’s a very nice fluent API for building windows services. One of the nicest things about it is that you can run the service as a console app during development but easily install it as a windows service in production.

Before running RuntimeServices you have to provide it with a SQL database. I wanted to use my local SQL Server instance so I opened up the MassTransit.RuntimeServices.exe.config file, commented out the SQL CE NHibernate configuration and uncommented the SQL Server stuff. I also changed the connection string to point to a test database I’d created. I then ran the SetupSQLServer.sql script (under the PreBuiltServices\MassTransit.RuntimeServices folder) into my database to create the required tables.

So let’s start up RuntimeServices by double clicking the MassTransit.RuntimeServices.exe in the bin directory.

clip_image002

A whole load of debug messages are spat out. Also we can see that some new private MSMQs have been automatically created:

clip_image004

We can also launch MassTransit.SystemView.exe (also in the bin folder) which gives us a nice GUI view of our services:

clip_image006

I think it shows a list of subscriber queues on the left. If you expand the nodes you can see the types that are subscribed to. I guess the reason that the mt_subscriptions and mt_health_control queues are not shown is that they don’t have any subscriptions associated with them.

Now let’s create the simplest possible subscriber and publisher. First I’ll create a message structure. I want my message class to be shared by my publisher and subscriber, so I’ll create it in its own assembly and then reference that assembly in the publisher and subscriber projects. My message is very simple, just a regular POCO:

namespace MassTransit.Play.Messages
{
    public class NewCustomerMessage
    {
        public string Name { get; set; }
    }
}

Now for the publisher. MassTransit uses the Castle Windsor IoC container by default and log4net so we need to add the following references:

clip_image008

The MassTransit API is configured as a Windsor facility. I’m a big fan of Windsor, so this all makes sense to me. Here’s the Windsor config file:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <facilities>
    <facility id="masstransit">
      <bus id="main" endpoint="msmq://localhost/mt_mike_publisher">
        <subscriptionService endpoint="msmq://localhost/mt_subscriptions" />
        <managementService heartbeatInterval="3" />
      </bus>
      <transports>
        <transport>MassTransit.Transports.Msmq.MsmqEndpoint, MassTransit.Transports.Msmq</transport>
      </transports>
    </facility>
  </facilities>
</configuration>

As you can see we reference the ‘masstransit’ facility and configure it with two main nodes, bus and transports. Transports is pretty straightforward, we simply specify the MsmqEndpoint. The bus node specifies an id and an endpoint. As far as I understand it, if your service only publishes, then the queue is never used. But MassTransit throws, if you don’t specify it. I’m probably missing something here, any clarification will be warmly received J

Continuing with the configuration; under bus are two child nodes, subscriptionService and management service. The subscriptionService endpoint specifies the location of the subscription queue which RuntimeServices uses to keep track of subscriptions, this should be the location of the queue created when RuntimeServices starts up for the first time, on my machine it was mt_subscriptions. I’m unsure what the managementService specifies exactly, but I think it’s the subsystem that allows RuntimeServices to monitor the health of the service. I’m assuming that the heartbeatInterval is the number of seconds between each notification.

Next, let’s code our publisher. I’m going to create a simple console application, I would host a service with Top Shelf in production, but right now I want to do the simplest thing possible, so I’m going to keep any other infrastructure out of the equation for the time being. Here’s the publisher code:

using System;
using MassTransit.Play.Messages;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;

namespace MassTransit.Play.Publisher
{
    public class Program
    {
        static void Main()
        {
            Console.WriteLine("Starting Publisher");

            MsmqEndpointConfigurator.Defaults(config =>
            {
                config.CreateMissingQueues = true;
            });

            var container = new DefaultMassTransitContainer("windsor.xml");
            var bus = container.Resolve<IServiceBus>();

            string name;
            while((name = GetName()) != "q")
            {
                var message = new NewCustomerMessage {Name = name};
                bus.Publish(message);
                
                Console.WriteLine("Published NewCustomerMessage with name {0}", message.Name);
            }

            Console.WriteLine("Stopping Publisher");
            container.Release(bus);
            container.Dispose();
        }

        private static string GetName()
        {
            Console.WriteLine("Enter a name to publish (q to quit)");
            return Console.ReadLine();
        }
    }
}

The first statement instructs the MassTransit MsmqEndpointConfigurator to create any missing queues so that we don’t have to manually create the mt_mike_publisher queue. The pattern used here is very common in the MassTransit code, where a static method takes an Action<TConfig> of some configuration class.

The next line creates the DefaultMassTransitContainer. This is a WindsorContainer with the MassTransitFacility registered and all the components needed for MassTransit to run. For us the most important service is the IServiceBus which encapsulates most of the client API. The next line gets the bus from the container.

We then set up a loop getting input from the user, creating a NewCustomerMessage and calling bus.Publish(message). It really is as simple as that.

Let’s look at the subscriber next. The references and Windsor.xml config are almost identical to the publisher, the only thing that’s different is that the bus endpoint should point to a different msmq; mt_mike_subscriber in my case.

In order to subscribe to a message type we first have to create a consumer. The consumer ‘consumes’ the message when it arrives at the bus.

using System;
using MassTransit.Internal;
using MassTransit.Play.Messages;

namespace MassTransit.Play.Subscriber.Consumers
{
    public class NewCustomerMessageConsumer : Consumes<NewCustomerMessage>.All, IBusService
    {
        private IServiceBus bus;
        private UnsubscribeAction unsubscribeAction;

        public void Consume(NewCustomerMessage message)
        {
            Console.WriteLine(string.Format("Received a NewCustomerMessage with Name : '{0}'", message.Name));
        }

        public void Dispose()
        {
            bus.Dispose();
        }

        public void Start(IServiceBus bus)
        {
            this.bus = bus;
            unsubscribeAction = bus.Subscribe(this);
        }

        public void Stop()
        {
            unsubscribeAction();
        }
    }
}

You create a consumer by implementing the Consumes<TMessage>.All interface and, as Ayende says, it’s a very clever, fluent way of specifying both what needs to be consumed and how it should be consumed. The ‘All’ interface has a single method that needs to be implemented, Consume, and we simply write to the console that the message has arrived. Our consumer also implements IBusService, that gives us places to start and stop the service bus and do the actual subscription.

Here’s the Main method of the subscription console application:

using System;
using Castle.MicroKernel.Registration;
using MassTransit.Play.Subscriber.Consumers;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;

namespace MassTransit.Play.Subscriber
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("Starting Subscriber, hit return to quit");

            MsmqEndpointConfigurator.Defaults(config =>
                {
                    config.CreateMissingQueues = true;
                });

            var container = new DefaultMassTransitContainer("windsor.xml")
                .Register(
                    Component.For<NewCustomerMessageConsumer>().LifeStyle.Transient
                );

            var bus = container.Resolve<IServiceBus>();
            var consumer = container.Resolve<NewCustomerMessageConsumer>();
            consumer.Start(bus);

            Console.ReadLine();
            Console.WriteLine("Stopping Subscriber");
            consumer.Stop();
            container.Dispose();
        }
    }
}

Once again we specify that we want MassTransit to create our queues automatically and create a DefaultMassTransitContainer. The only addition we have to make for our subscriber is to register our consumer so that the bus can resolve it from the container.

Next we simply grab the bus and our consumer from the container and call start on the consumer passing it the bus. A nice little bit of double dispatch :)

Now we can start up our Publisher and Subscriber and send messages between them.

clip_image010

clip_image012

Wow it works! I got a lot of childish pleasure from starting up multiple instances of my publisher and subscriber on multiple machines and watching the messages go back and forth. But then I’m a simple soul.

Looking at the MSMQ snap-in, we can see that the MSMQ queues have been automatically created for mt_mike_publisher and mt_mike_subscriber.

clip_image014

MassTransit System View also shows that the NewCustomerMessage is subscribed to on mt_mike_subscriber. It also shows the current status of our services. You can see that I have been turning them on and off through the morning.

clip_image016

Overall I’m impressed with MassTransit. Like most new open source projects the documentation is non-existent, but I was able to get started by looking at the Starbucks sample and reading Rhys C’s excellent blog posts. Kudos to Chris Patterson (AKA PhatBoyG) and Dru Sellers for putting such a cool project together.

7 comments:

Anonymous said...

Mike, I'll be keen to hear (assuming you proceed) how you replace your current messaging / soa implementation with MassTransit?

Do you intend to add some abstractions to what you currently have and build unit / integration test around that before "swapping" the services around?

Or are you going for a big bad rewrite?

Just curious, and good luck it you go ahead...

Mike Hadlow said...

Hi Ed,

Right now we have a really nasty spiderweb of web services, shared databases and other abominations. My plan is to gradually introduce a message based architecture, probably using MT.

We're going to be doing our first implmentation of it over the next few weeks to put in some new functionality, but in the process I'm going to take the chance to refactor some of the existing pieces.

Of course I'll be following Michael Feather's advice: decoupling, wrapping with tests and refactoring. No big bad rewrite :)

Eddie Cianci said...

Hi Mike, excellent introductory article on the MT pub/sub setup!

Could you have made the NewCustomerMessageConsumer object take IServiceBus as a primal dependency in the constructor?

Or does MT require a message to have a parameterless ctor?

Mike Hadlow said...

Hi Eddie,

The IBusService's Start method takes IServiceBus as it's argument, so there's no need for the consumer to have a dependency on it. I know it looks a little awkward the way I've done it because I wanted to show the 'simplest possible thing' in a basic console application. There's a bus configurator that understands the IBusService interface and can do the wiring up for you. I'll try and blog a bit more about that soon.

Anonymous said...

Mike,

When I type the letter q into the publisher, the process does not exit. When I attached to the process, it looked like there was a thread that was still watching the message queue.

I assume that we missed a method call after we fall out of the while loop in your publisher's main method? If so, do you know what it is?

Thanks for the great post!
John Ruiz

Anonymous said...

Mike,

Do you just change the windsor.xml to point to one machine hostname instead of localhost to get multiple publishers and subscribers across PCs?

I tried to point multiple pub/subs to the machine hosting the runtimeservices but the app crashes as it unable to find the "bus".

I am new ESBs in general. I downloaded you code and was looking for some guidance.

Thanks
K

Mike Hadlow said...

Hi Anonymous, this post is getting quite old now, and I haven't been keeping up to date with the latest MassTransit stuff. You are probably better off asking questions on the MT list:
http://groups.google.com/group/masstransit-discuss