Get the code for this post here:
http://static.mikehadlow.com/MassTransit.Play.zip
I’ve recently been trying out MassTransit as a possible replacement for our current JBOWS architecture. MassTransit is a “lean service bus implementation for building loosely coupled applications using the .NET framework.” It’s a simple service bus based around the idea of asynchronous publish and subscribe. It’s written by Dru Sellers and Chris Patterson, both good guys who have been very quick to respond on both twitter and the MassTransit google group.
To start with I wanted to try out the simplest thing possible, a single publisher and a single subscriber. I wanted to be able to publish a message and have the subscriber pick it up.
The first thing to do is get the latest source from the MassTransit google code repository and build it.
The core Mass Transit infrastructure is provided by a service called MassTransit.RuntimeServices.exe this is a windows service built on Top Shelf (be careful what you click on at work when Googling for this J). I plan to blog about Top Shelf in the future, but in short it’s a very nice fluent API for building windows services. One of the nicest things about it is that you can run the service as a console app during development but easily install it as a windows service in production.
Before running RuntimeServices you have to provide it with a SQL database. I wanted to use my local SQL Server instance so I opened up the MassTransit.RuntimeServices.exe.config file, commented out the SQL CE NHibernate configuration and uncommented the SQL Server stuff. I also changed the connection string to point to a test database I’d created. I then ran the SetupSQLServer.sql script (under the PreBuiltServices\MassTransit.RuntimeServices folder) into my database to create the required tables.
So let’s start up RuntimeServices by double clicking the MassTransit.RuntimeServices.exe in the bin directory.
A whole load of debug messages are spat out. Also we can see that some new private MSMQs have been automatically created:
We can also launch MassTransit.SystemView.exe (also in the bin folder) which gives us a nice GUI view of our services:
I think it shows a list of subscriber queues on the left. If you expand the nodes you can see the types that are subscribed to. I guess the reason that the mt_subscriptions and mt_health_control queues are not shown is that they don’t have any subscriptions associated with them.
Now let’s create the simplest possible subscriber and publisher. First I’ll create a message structure. I want my message class to be shared by my publisher and subscriber, so I’ll create it in its own assembly and then reference that assembly in the publisher and subscriber projects. My message is very simple, just a regular POCO:
namespace MassTransit.Play.Messages
{
public class NewCustomerMessage
{
public string Name { get; set; }
}
}
Now for the publisher. MassTransit uses the Castle Windsor IoC container by default and log4net so we need to add the following references:
The MassTransit API is configured as a Windsor facility. I’m a big fan of Windsor, so this all makes sense to me. Here’s the Windsor config file:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<facilities>
<facility id="masstransit">
<bus id="main" endpoint="msmq://localhost/mt_mike_publisher">
<subscriptionService endpoint="msmq://localhost/mt_subscriptions" />
<managementService heartbeatInterval="3" />
</bus>
<transports>
<transport>MassTransit.Transports.Msmq.MsmqEndpoint, MassTransit.Transports.Msmq</transport>
</transports>
</facility>
</facilities>
</configuration>
As you can see we reference the ‘masstransit’ facility and configure it with two main nodes, bus and transports. Transports is pretty straightforward, we simply specify the MsmqEndpoint. The bus node specifies an id and an endpoint. As far as I understand it, if your service only publishes, then the queue is never used. But MassTransit throws, if you don’t specify it. I’m probably missing something here, any clarification will be warmly received J
Continuing with the configuration; under bus are two child nodes, subscriptionService and management service. The subscriptionService endpoint specifies the location of the subscription queue which RuntimeServices uses to keep track of subscriptions, this should be the location of the queue created when RuntimeServices starts up for the first time, on my machine it was mt_subscriptions. I’m unsure what the managementService specifies exactly, but I think it’s the subsystem that allows RuntimeServices to monitor the health of the service. I’m assuming that the heartbeatInterval is the number of seconds between each notification.
Next, let’s code our publisher. I’m going to create a simple console application, I would host a service with Top Shelf in production, but right now I want to do the simplest thing possible, so I’m going to keep any other infrastructure out of the equation for the time being. Here’s the publisher code:
using System;
using MassTransit.Play.Messages;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;
namespace MassTransit.Play.Publisher
{
public class Program
{
static void Main()
{
Console.WriteLine("Starting Publisher");
MsmqEndpointConfigurator.Defaults(config =>
{
config.CreateMissingQueues = true;
});
var container = new DefaultMassTransitContainer("windsor.xml");
var bus = container.Resolve<IServiceBus>();
string name;
while((name = GetName()) != "q")
{
var message = new NewCustomerMessage {Name = name};
bus.Publish(message);
Console.WriteLine("Published NewCustomerMessage with name {0}", message.Name);
}
Console.WriteLine("Stopping Publisher");
container.Release(bus);
container.Dispose();
}
private static string GetName()
{
Console.WriteLine("Enter a name to publish (q to quit)");
return Console.ReadLine();
}
}
}
The first statement instructs the MassTransit MsmqEndpointConfigurator to create any missing queues so that we don’t have to manually create the mt_mike_publisher queue. The pattern used here is very common in the MassTransit code, where a static method takes an Action<TConfig> of some configuration class.
The next line creates the DefaultMassTransitContainer. This is a WindsorContainer with the MassTransitFacility registered and all the components needed for MassTransit to run. For us the most important service is the IServiceBus which encapsulates most of the client API. The next line gets the bus from the container.
We then set up a loop getting input from the user, creating a NewCustomerMessage and calling bus.Publish(message). It really is as simple as that.
Let’s look at the subscriber next. The references and Windsor.xml config are almost identical to the publisher, the only thing that’s different is that the bus endpoint should point to a different msmq; mt_mike_subscriber in my case.
In order to subscribe to a message type we first have to create a consumer. The consumer ‘consumes’ the message when it arrives at the bus.
using System;
using MassTransit.Internal;
using MassTransit.Play.Messages;
namespace MassTransit.Play.Subscriber.Consumers
{
public class NewCustomerMessageConsumer : Consumes<NewCustomerMessage>.All, IBusService
{
private IServiceBus bus;
private UnsubscribeAction unsubscribeAction;
public void Consume(NewCustomerMessage message)
{
Console.WriteLine(string.Format("Received a NewCustomerMessage with Name : '{0}'", message.Name));
}
public void Dispose()
{
bus.Dispose();
}
public void Start(IServiceBus bus)
{
this.bus = bus;
unsubscribeAction = bus.Subscribe(this);
}
public void Stop()
{
unsubscribeAction();
}
}
}
You create a consumer by implementing the Consumes<TMessage>.All interface and, as Ayende says, it’s a very clever, fluent way of specifying both what needs to be consumed and how it should be consumed. The ‘All’ interface has a single method that needs to be implemented, Consume, and we simply write to the console that the message has arrived. Our consumer also implements IBusService, that gives us places to start and stop the service bus and do the actual subscription.
Here’s the Main method of the subscription console application:
using System;
using Castle.MicroKernel.Registration;
using MassTransit.Play.Subscriber.Consumers;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;
namespace MassTransit.Play.Subscriber
{
class Program
{
static void Main()
{
Console.WriteLine("Starting Subscriber, hit return to quit");
MsmqEndpointConfigurator.Defaults(config =>
{
config.CreateMissingQueues = true;
});
var container = new DefaultMassTransitContainer("windsor.xml")
.Register(
Component.For<NewCustomerMessageConsumer>().LifeStyle.Transient
);
var bus = container.Resolve<IServiceBus>();
var consumer = container.Resolve<NewCustomerMessageConsumer>();
consumer.Start(bus);
Console.ReadLine();
Console.WriteLine("Stopping Subscriber");
consumer.Stop();
container.Dispose();
}
}
}
Once again we specify that we want MassTransit to create our queues automatically and create a DefaultMassTransitContainer. The only addition we have to make for our subscriber is to register our consumer so that the bus can resolve it from the container.
Next we simply grab the bus and our consumer from the container and call start on the consumer passing it the bus. A nice little bit of double dispatch :)
Now we can start up our Publisher and Subscriber and send messages between them.
Wow it works! I got a lot of childish pleasure from starting up multiple instances of my publisher and subscriber on multiple machines and watching the messages go back and forth. But then I’m a simple soul.
Looking at the MSMQ snap-in, we can see that the MSMQ queues have been automatically created for mt_mike_publisher and mt_mike_subscriber.
MassTransit System View also shows that the NewCustomerMessage is subscribed to on mt_mike_subscriber. It also shows the current status of our services. You can see that I have been turning them on and off through the morning.
Overall I’m impressed with MassTransit. Like most new open source projects the documentation is non-existent, but I was able to get started by looking at the Starbucks sample and reading Rhys C’s excellent blog posts. Kudos to Chris Patterson (AKA PhatBoyG) and Dru Sellers for putting such a cool project together.