Monday, November 26, 2012

RabbitMQ On Windows With .NET, A Case Study

Any reader of this blog will know that my big project over the last year has been to create a simple .NET API for RabbitMQ called EasyNetQ.  I’ve been working as a software architect at 15Below for the last year and a half. The prime motivation for writing EasyNetQ was so that our developers would have an easy API for working with RabbitMQ on .NET. I was very fortunate that, founder and technical director, John Clynes, supported my wish to build it as an open source library. I originally wrote this post for the VMWare blog as a case study of running RabbitMQ in a Microsoft environment.
15Below is based in Brighton on the south coast of England, famous for it’s Regency pavilion and Victorian pier. We provide messaging and integration services for the travel industry. Our clients include Ryanair, Qantas, JetBlue, Thomas Cook and around 30 other airline and rail customers. We send hundreds of millions of transactional notifications each year to our customer’s passengers.
RabbitMQ has helped us to significantly simplify and stabilise our software. It’s one of those black boxes that you install, configure, and then really don’t have to worry about. In over a year of production we’ve found it to be extremely stable and reliable.
Prior to introducing RabbitMQ our applications would use SQL Server as a queuing mechanism. Each task would be represented by a row in a workflow table. Each process in the workflow would poll the table looking for rows that matched its status, process the rows in in a batch, and then update the rows’ status field for the next process to pick up. Each step in the process would be hosted by an application service that implemented its own threading model, often using a different approach to all the other services. This created highly coupled software, with workflow steps and infrastructure concerns, such as threading and load balancing, mixed together with business logic. We also discovered that a relational database is not a natural fit for a queuing system. The contention on the workflow tables is high, with constant inserts, selects and updates causing locking issues. Deleting completed items is also problematic on highly indexed tables and we had considerable problems with continuously growing tables.
I wrote about the ‘Database As Queue Anti-Pattern’ in a previous post in more detail.
RabbitMQ provides a number of features that helped us overcome these problems. Firstly it is designed from the beginning as a high-performance messaging platform. It easily outperformed our SQL Server based solution with none of its locking or deletion problems. Rabbit’s event-oriented messaging model also takes away much of the need for complex multi-threaded batch processing code that was previously a cause of instability in our systems.
We first introduced RabbitMQ about 18 months ago as the core infrastructure behind our Flight Status product. We wanted a high performance messaging product with a proven track record that supported common messaging patterns, such as publish/subscribe and request/response. A further requirement was that it should provide automatic work distribution and load balancing.
The need to support messaging patterns ruled out simple store-and-forward queues such as MSMQ and ActiveMQ. We were very impressed by ZeroMQ, but decided that we really needed the centralised manageability of a broker based product. This left RabbitMQ. Although support for AMQP, an open messaging standard, wasn’t in our list of requirements, its implementation by RabbitMQ made us more confident that we were choosing a sustainable strategy.
We are very much a Microsoft shop, so we had some initial concerns about RabbitMQ’s performance and stability on Windows. We were reassured by reading some accounts of RabbitMQ’s and indeed Erlang’s use on Windows by organisations with some very impressive load requirements. Subsequent experience has borne these reports out, and we have found RabbitMQ on Server 2008 to be rock solid.
As a Microsoft shop, our development platform is .NET. Although VMWare provide an AMQP C# client, it is a low-level API, not suitable for use by more junior developers. For this reason we created our own high-level .NET API for RabbitMQ that provides simple single method access to common messaging patterns and does not require a deep knowledge of AMQP.  This API is called EasyNetQ. We’ve open sourced it and, with over 3000 downloads, it is now the leading high-level API for RabbitMQ with .NET. You can find more information about it at EasyNetQ.com. We would recommend looking at it if you are a .NET shop using RabbitMQ.
15Below’s Flight-Status product provides real-time flight information to passengers and their family and friends. We interface with the airline’s real-time flight information stream generated from their operation systems and provide a platform that allows them to apply complex business logic to this stream. We render customer tailored output, and communicate with the airline’s customers via a range of channels, including email, SMS, voice and iPhone/Android push. RabbitMQ allows us to build each piece; the client for the fight information stream, the message renderer, the sink channels and the business logic; as separate components that communicate using structured messages. Our architecture looks something like this:
rabbit_based_architecture
The green boxes are our core product systems, the blue boxes represent custom code that we write for each customer. A ‘customer saga’ is code that models a long-running business process and includes all the workflow logic for a particular customer’s flight information requirements. A ‘core product service’ is an independent service that implements a feature of our product. An example would be the service that takes flight information and combines it with a customer defined template to create an email to be sent to a passenger. Constructing services as independently deployable and runnable applications gives us great flexibility and scalability. If we need to scale up a particular component, we simply install more copies. RabbitMQ’s automatic work sharing feature means that we can do this without any reconfiguration of existing components. This architecture also makes it easy to test each application service in isolation since it’s simply a question of firing messages at the service and watching its response.
In conclusion, RabbitMQ has provided a rock solid piece of infrastructure with the features to allow us to significantly reduce the architectural complexity of our systems. We can now build software for our clients faster and more reliably. It scales to higher loads than our previous relational-database based systems and is more flexible in the face of changing customer requirements.

Friday, November 23, 2012

Using BlockingCollection To Communicate Between Threads

Consider these (somewhat) common programming challenges:

  • I’m using a third party library that is not thread safe, but I want my application to share work between multiple threads. How do I marshal calls between my multi-threaded code to the single threaded library?
  • I have a single source of events on a single thread, but I want to share the work between a pool of multiple threads?
  • I have multiple threads emitting events, but I want to consume them on a single thread?

One way of doing this would be to have some shared state, a field or a property on a static class, and wrap locks around it so that multiple threads can access it safely. This is a pretty common way of trying to skin this particular cat, but it’s shot through with traps for the unwary. Also, it can hurt performance because access to the shared resource is serialized, even though the things accessing it are running in parallel.

A better way is to use a BlockingCollection and have your threads communicate via message classes.

BlockingCollection is a class in the new System.Collections.Concurrent namespace that arrived with .NET 4.0. It contains a ConcurrentQueue, although you can swap this for a ConcurrentStack or a ConcurrentBag if you want. You push objects in at one end and sit in a loop consuming them from the other. The (multiple) producer(s) and (multiple) consumer(s) can be running on different threads without any locks. That’s OK because the Concurrent namespace collection classes are guaranteed to be thread safe. The ‘blocking’ part of the name is there because the consuming end blocks until an object is available. Justin Etheredge has an excellent post that looks at BlockingCollection in more detail here.

For an example, let’s implement a parallel pipeline. A ventilator produces tasks to be processed in parallel, a set of workers process the tasks on separate threads, and a sink collects the results back together again. It shows both one-to-many and many-to-one thread communication. I’ve stolen the idea and the diagram from the excellent ZeroMQ Guide:

zguide_parallel_workers

First we’ll need a class that represents a piece of work, we’ll keep it super simple for this example:

public class WorkItem
{
public string Text { get; set; }
}

We’ll need two BlockingCollections, one to take the tasks from the ventilator to the workers, and another to take the finished work from the workers to the sink:

var ventilatorQueue = new BlockingCollection<WorkItem>();
var sinkQueue = new BlockingCollection<WorkItem>();

Now let’s write our ventilator:

public static void StartVentilator(BlockingCollection<WorkItem> ventilatorQueue)
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 100; i++)
{
ventilatorQueue.Add(new WorkItem { Text = string.Format("Item {0}", i) });
}
}, TaskCreationOptions.LongRunning);
}

It just iterates 100 times creating work items and pushing them on the ventilatorQueue.

Here is a worker:

public static void StartWorker(int workerNumber,
BlockingCollection<WorkItem> ventilatorQueue,
BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in ventilatorQueue.GetConsumingEnumerable())
{
// pretend to take some time to process
Thread.Sleep(30);
workItem.Text = workItem.Text + " processed by worker " + workerNumber;
sinkQueue.Add(workItem);
}
}, TaskCreationOptions.LongRunning);
}

BlockingCollection provides a GetConsumingEnumerable method that yields each item in turn. It blocks if there are no items on the queue. Note that I’m not worrying about shutdown patterns in this code. In production code you’d need to worry about how to close down your worker threads.

Next let’s write our sink:

public static void StartSink(BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in sinkQueue.GetConsumingEnumerable())
{
Console.WriteLine("Processed Messsage: {0}", workItem.Text);
}
}, TaskCreationOptions.LongRunning);
}

Once again, this sits in an infinite foreach loop consuming items from the sinkQueue.

Finally we need to wire up the pieces and kick it off:

StartSink(sinkQueue);

StartWorker(0, ventilatorQueue, sinkQueue);
StartWorker(1, ventilatorQueue, sinkQueue);
StartWorker(2, ventilatorQueue, sinkQueue);

StartVentilator(ventilatorQueue);

I’ve started the sink first, then the workers and finally the producer. It doesn’t overly matter what order they start in since the queues will store any tasks the ventilator creates before the workers and the sink start.

Running the code I get output something like this:

Processed Messsage: Item 1 processed by worker 1
Processed Messsage: Item 2 processed by worker 0
Processed Messsage: Item 0 processed by worker 2
Processed Messsage: Item 5 processed by worker 2
Processed Messsage: Item 3 processed by worker 1

....

Processed Messsage: Item 95 processed by worker 0
Processed Messsage: Item 98 processed by worker 0
Processed Messsage: Item 97 processed by worker 2
Processed Messsage: Item 96 processed by worker 1
Processed Messsage: Item 99 processed by worker 0

This pattern is a great way of decoupling the communication between a source and a sink, or a producer and a consumer. It also allows you to have multiple sources and multiple sinks, but primarily it’s a safe way for multiple threads to interact.

The complete example is here on GitHub.

Thursday, November 15, 2012

A C# .NET Client Proxy For The RabbitMQ Management API

RabbitMQ comes with a very nice Management UI and a HTTP JSON API, that allows you to configure and monitor your RabbitMQ broker. From the website:

The rabbitmq-management plugin provides an HTTP-based API for management and monitoring of your RabbitMQ server, along with a browser-based UI and a command line tool, rabbitmqadmin. Features include:

  • Declare, list and delete exchanges, queues, bindings, users, virtual hosts and permissions.
  • Monitor queue length, message rates globally and per channel, data rates per connection, etc.
  • Send and receive messages.
  • Monitor Erlang processes, file descriptors, memory use.
  • Export / import object definitions to JSON.
  • Force close connections, purge queues.

Wouldn’t it be cool if you could do all these management tasks from your .NET code? Well now you can. I’ve just added a new project to EasyNetQ called EasyNetQ.Management.Client. This is a .NET client-side proxy for the HTTP-based API.

It’s on NuGet, so to install it, you simply run:

PM> Install-Package EasyNetQ.Management.Client

To give an overview of the sort of things you can do with EasyNetQ.Client.Management, have a look at this code. It first creates a new Virtual Host and a User, and gives the User permissions on the Virtual Host. Then it re-connects as the new user, creates an exchange and a queue, binds them, and publishes a message to the exchange. Finally it gets the first message from the queue and outputs it to the console.

var initial = new ManagementClient("http://localhost", "guest", "guest");

// first create a new virtual host
var vhost = initial.CreateVirtualHost("my_virtual_host");

// next create a user for that virutal host
var user = initial.CreateUser(new UserInfo("mike", "topSecret"));

// give the new user all permissions on the virtual host
initial.CreatePermission(new PermissionInfo(user, vhost));

// now log in again as the new user
var management = new ManagementClient("http://localhost", user.name, "topSecret");

// test that everything's OK
management.IsAlive(vhost);

// create an exchange
var exchange = management.CreateExchange(new ExchangeInfo("my_exchagne", "direct"), vhost);

// create a queue
var queue = management.CreateQueue(new QueueInfo("my_queue"), vhost);

// bind the exchange to the queue
management.CreateBinding(exchange, queue, new BindingInfo("my_routing_key"));

// publish a test message
management.Publish(exchange, new PublishInfo("my_routing_key", "Hello World!"));

// get any messages on the queue
var messages = management.GetMessagesFromQueue(queue, new GetMessagesCriteria(1, false));

foreach (var message in messages)
{
Console.Out.WriteLine("message.payload = {0}", message.payload);
}

This library is also ideal for monitoring queue levels, channels and connections on your RabbitMQ broker. For example, this code prints out details of all the current connections to the RabbitMQ broker:

var connections = managementClient.GetConnections();

foreach (var connection in connections)
{
Console.Out.WriteLine("connection.name = {0}", connection.name);
Console.WriteLine("user:\t{0}", connection.client_properties.user);
Console.WriteLine("application:\t{0}", connection.client_properties.application);
Console.WriteLine("client_api:\t{0}", connection.client_properties.client_api);
Console.WriteLine("application_location:\t{0}", connection.client_properties.application_location);
Console.WriteLine("connected:\t{0}", connection.client_properties.connected);
Console.WriteLine("easynetq_version:\t{0}", connection.client_properties.easynetq_version);
Console.WriteLine("machine_name:\t{0}", connection.client_properties.machine_name);
}

On my machine, with one consumer running it outputs this:
 
connection.name = [::1]:64754 -> [::1]:5672
user: guest
application: EasyNetQ.Tests.Performance.Consumer.exe
client_api: EasyNetQ
application_location: D:\Source\EasyNetQ\Source\EasyNetQ.Tests.Performance.Consumer\bin\Debug
connected: 14/11/2012 15:06:19
easynetq_version: 0.9.0.0
machine_name: THOMAS

You can see the name of the application that’s making the connection, the machine it’s running on and even its location on disk. That’s rather nice. From this information it wouldn’t be too hard to auto-generate a complete system diagram of your distributed messaging application. Now there’s an idea :)

For more information, check out the documentation.

Monday, November 12, 2012

Nicer Client Properties For EasyNetQ

EasyNetQ is my lightweight easy-to-use .NET API for RabbitMQ.

Today I added a small but very nice feature, better client properties. Now when you look at connections created by EasyNetQ you can see the machine that connected, the application and the application’s location on disk. It also gives you the date and time that EasyNetQ first connected. Very useful for debugging.

Here’s an example. Check out the ‘Client Properties’ section.

nice_connection_properties

Tuesday, November 06, 2012

EasyNetQ Publisher Confirms

EasyNetQ is my easy-to-use .NET API for RabbitMQ.

The default AMQP publish is not transactional and doesn't guarantee that your message will actually reach the broker. AMQP does specify a transactional publish, but with RabbitMQ it is extremely slow, around 200 slower than a non-transactional publish, and we haven't supported it via the EasyNetQ API. For high-performance guaranteed delivery it's recommended that you use 'Publisher Confirms'. Simply speaking, this an extension to AMQP that provides a call-back when your message has been successfully received by the broker.

What does 'successfully received' mean? It depends ...

  • A transient message is confirmed the moment it is enqueued.
  • A persistent message is confirmed as soon as it is persisted to disk, or when it is consumed on every queue.
  • An unroutable transient message is confirmed directly it is published.

For more information on publisher confirms, please read the announcement on the RabbitMQ blog.

To use publisher confirms, you must first create the publish channel with publisher confirms on:

var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms())

Next you must specify success and failure callbacks when you publish your message:

channel.Publish(message, x => 
x.OnSuccess(() =>
{
// do success processing here
})
.OnFailure(() =>
{
// do failure processing here
}));

Be careful not to dispose the publish channel before your call-backs have had a chance to execute.

Here's an example of a simple test. We're publishing 10,000 messages and then waiting for them all to be acknowledged before disposing the channel. There's a timeout, so if the batch takes longer than 10 seconds we abort with an exception.

const int batchSize = 10000;
var callbackCount = 0;
var stopwatch = new Stopwatch();
stopwatch.Start();

using (var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
{
for (int i = 0; i < batchSize; i++)
{
var message = new MyMessage {Text = string.Format("Hello Message {0}", i)};
channel.Publish(message, x =>
x.OnSuccess(() => {
callbackCount++;
})
.OnFailure(() =>
{
callbackCount++;
}));
}

// wait until all the publications have been acknowleged.
while (callbackCount < batchSize)
{
if (stopwatch.Elapsed.Seconds > 10)
{
throw new ApplicationException("Aborted batch with timeout");
}
Thread.Sleep(10);
}
}

Wednesday, October 10, 2012

A Functional IoC Container

Today I was idly thinking about an idea I had a couple of years ago for a functional IoC container. I’d had a go at implementing such a beast, but soon got bogged down in a tangled mess of spaghetti reflection code and gave it up as too much bother. But today it suddenly occurred  to me that there was no need for any reflection voodoo; the C# type system is powerful enough to do all the work for us.

In object oriented programming languages we build programs from classes. Classes declare the contract(s) they support with interfaces and declare their dependencies with constructor arguments. We use an IoC container to wire instances of our classes together to make a running program.

Pure functional languages, like Haskell, don’t have any concept of class, instead they use currying and partial application to compose hierarchies of functions.

Here’s an example of a purely functional program written in C#.

public static class Module
{
public static Data GetAndTransform(Func<Input,Data> dataAccsessor, Func<Data,Data> transformer, int id)
{
var input = new Input() {Id = id};
var data = dataAccsessor(input);
var transformed = transformer(data);
return transformed;
}

public static Data DataAccsessor(Input input)
{
return new Data
{
Id = input.Id,
Name = "Test"
};
}

public static Data Transformer(Data original)
{
original.Name = original.Name + " transformed";
return original;
}
}

GetAndTransform simply takes an int id argument, does some work, and then returns some data. It needs a dataAccsessor and a transformer in order to do its job.

C# doesn’t support currying or partial application, so in order to run it we have to compose the program and execute it all in one step. For example:

var id = 10;
var data = Module.GetAndTransform(Module.DataAccsessor, Module.Transformer, id);

Console.Out.WriteLine("data.Id = {0}", data.Id);
Console.Out.WriteLine("data.Name = {0}", data.Name);

But what if we had a ‘currying container’, one that could compose the program in one step and then return a function for us to execute in another? Here is such a container at work with our program:

var registration = new Container()
.Register<Func<Input, Data>, Func<Data, Data>, int, Data>(Module.GetAndTransform)
.Register<Input,Data>(Module.DataAccsessor)
.Register<Data,Data>(Module.Transformer);

var main = registration.Get<Func<int, Data>>();

var data = main(10);

Console.Out.WriteLine("data.Id = {0}", data.Id);
Console.Out.WriteLine("data.Name = {0}", data.Name);

In the first line, we create a new instance of our container. On the next three lines we register our functions. Unfortunately C#’s type inference isn’t powerful enough to let us do away with the tedious type annotations; we have to explicitly declare the argument and return types of each of our functions.

Once our functions are registered we can ask the container for a program (main) that takes an int and returns a Data instance. The container works out that it needs to curry GetAndTransform and then partially apply DataAccsessor and Transformer to it to produce the desired function.

We can then run our ‘main’ function which gives us our expected output:

data.Id = 10
data.Name = Test transformed

The container turns out to be very simple, just a dictionary that’s keyed by type and contains a collection of constructor functions that know how to build the target (key) type.

public interface IRegistration
{
void Add(Type target, Func<object> constructor);
T Get<T>();
}

public class Container : IRegistration
{
private readonly Dictionary<Type, Func<object>> registrations = new Dictionary<Type, Func<object>>();

public void Add(Type target, Func<object> constructor)
{
registrations.Add(target, constructor);
}

public T Get<T>()
{
return (T)registrations[typeof (T)]();
}
}

The magic sauce is in the Registration function overloads. If you take the standard functional idea that a function should only have one argument and one return type, you can take any input function, curry it, and then partially apply arguments until you are left with a Func<X,Y>. So you know what the ‘target’ type of each function should be, a function from the last argument to the return type. A Func<A,B,C,R> gets resolved to a Func<C,R>. There’s no need to explicitly register a target, it’s implicit from the type of the provided function:

public static class RegistrationExtensions
{
public static IRegistration Register<A,R>(this IRegistration registration, Func<A, R> source)
{
var targetType = typeof (Func<A, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried);

return registration;
}

public static IRegistration Register<A,B,R>(this IRegistration registration, Func<A, B, R> source)
{
var targetType = typeof (Func<B, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried(
registration.Get<A>()
));

return registration;
}

public static IRegistration Register<A, B, C, R>(this IRegistration registration, Func<A, B, C, R> source)
{
var targetType = typeof(Func<C, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried(
registration.Get<A>()
)
(
registration.Get<B>()
));

return registration;
}
}

Each overload deals with an input function with a different number of arguments. My simple experiment only works with functions with up to three arguments (two dependencies and an input type), but it would be easy to extend for higher numbers. The Curry function is stolen from Oliver Sturm and looks like this:

public static class Functional
{
public static Func<A, R> Curry<A, R>(Func<A, R> input)
{
return input;
}

public static Func<A, Func<B, R>> Curry<A, B, R>(Func<A, B, R> input)
{
return a => b => input(a, b);
}

public static Func<A, Func<B, Func<C,R>>> Curry<A, B, C, R>(Func<A, B, C, R> input)
{
return a => b => c => input(a, b, c);
}
}

Rather nice, even if I say so myself.

Of course this little experiment has many limitations. For a start it only understands functions in terms of Func< … >, so you can’t have more than one function of each ‘type’. You couldn’t have two Func<int,int> for example, which might be somewhat limiting.

The code is on GitHub here if you want to have a play.

Wednesday, October 03, 2012

EasyNetQ Cluster Support

EasyNetQ, my super simple .NET API for RabbitMQ, now (from version 0.7.2.34) supports RabbitMQ clusters without any need to deploy a load balancer.
Simply list the nodes of the cluster in the connection string ...
var bus = RabbitHutch.CreateBus("host=ubuntu:5672,ubuntu:5673");
In this example I have set up a cluster on a single machine, 'ubuntu', with node 1 on port 5672 and node 2 on port 5673. When the CreateBus statement executes, EasyNetQ will attempt to connect to the first host listed (ubuntu:5672). If it fails to connect it will attempt to connect to the second host listed (ubuntu:5673). If neither node is available it will sit in a re-try loop attempting to connect to both servers every five seconds. It logs all this activity to the registered IEasyNetQLogger. You might see something like this if the first node was unavailable:
DEBUG: Trying to connect
ERROR: Failed to connect to Broker: 'ubuntu', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable'
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/'
If the node that EasyNetQ is connected to fails, EasyNetQ will attempt to connect to the next listed node. Once connected, it will re-declare all the exchanges and queues and re-start all the consumers. Here's an example log record showing one node failing then EasyNetQ connecting to the other node and recreating the subscribers:
INFO: Disconnected from RabbitMQ Broker
DEBUG: Trying to connect
DEBUG: OnConnected event fired
DEBUG: Re-creating subscribers
INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/'
You get automatic fail-over out of the box. That’s pretty cool.
If you have multiple services using EasyNetQ to connect to a RabbitMQ cluster, they will all initially connect to the first listed node in their respective connection strings. For this reason the EasyNetQ cluster support is not really suitable for load balancing high throughput systems. I would recommend that you use a dedicated hardware or software load balancer instead, if that’s what you want.

Friday, September 28, 2012

Parsing a Connection String With Sprache

Sprache is a very cool lightweight parser library for C#. Today I was experimenting with parsing EasyNetQ connection strings, so I thought I’d have a go at getting Sprache to do it. An EasyNetQ connection string is a list of key-value pairs like this:

key1=value1;key2=value2;key3=value3

The motivation for looking at something more sophisticated than simply chopping strings based on delimiters, is that I’m thinking of having more complex values that would themselves need parsing. But that’s for the future, today I’m just going to parse a simple connection string where the values can be strings or numbers (ushort to be exact).

So, I want to parse a connection string that looks like this:

virtualHost=Copa;username=Copa;host=192.168.1.1;password=abc_xyz;port=12345;requestedHeartbeat=3

… into a strongly typed structure like this:

public class ConnectionConfiguration : IConnectionConfiguration
{
public string Host { get; set; }
public ushort Port { get; set; }
public string VirtualHost { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public ushort RequestedHeartbeat { get; set; }
}

I want it to be as easy as possible to add new connection string items.

First let’s define a name for a function that updates a ConnectionConfiguration. A uncommonly used version of the ‘using’ statement allows us to give a short name to a complex type:

using UpdateConfiguration = Func<ConnectionConfiguration, ConnectionConfiguration>;

Now lets define a little function that creates a Sprache parser for a key value pair. We supply the key and a parser for the value and get back a parser that can update the ConnectionConfiguration.

public static Parser<UpdateConfiguration> BuildKeyValueParser<T>(
string keyName,
Parser<T> valueParser,
Expression<Func<ConnectionConfiguration, T>> getter)
{
return
from key in Parse.String(keyName).Token()
from separator in Parse.Char('=')
from value in valueParser
select (Func<ConnectionConfiguration, ConnectionConfiguration>)(c =>
{
CreateSetter(getter)(c, value);
return c;
});
}

The CreateSetter is a little function that turns a property expression (like x => x.Name) into an Action<TTarget, TProperty>.

Next let’s define parsers for string and number values:

public static Parser<string> Text = Parse.CharExcept(';').Many().Text();
public static Parser<ushort> Number = Parse.Number.Select(ushort.Parse);

Now we can chain a series of BuildKeyValueParser invocations and Or them together so that we can parse any of our expected key-values:

public static Parser<UpdateConfiguration> Part = new List<Parser<UpdateConfiguration>>
{
BuildKeyValueParser("host", Text, c => c.Host),
BuildKeyValueParser("port", Number, c => c.Port),
BuildKeyValueParser("virtualHost", Text, c => c.VirtualHost),
BuildKeyValueParser("requestedHeartbeat", Number, c => c.RequestedHeartbeat),
BuildKeyValueParser("username", Text, c => c.UserName),
BuildKeyValueParser("password", Text, c => c.Password),
}.Aggregate((a, b) => a.Or(b));

Each invocation of BuildKeyValueParser defines an expected key-value pair of our connection string. We just give the key name, the parser that understands the value, and the property on ConnectionConfiguration that we want to update. In effect we’ve defined a little DSL for connection strings. If I want to add a new connection string value, I simply add a new property to ConnectionConfiguration and a single line to the above code.

Now lets define a parser for the entire string, by saying that we’ll parse any number of key-value parts:

public static Parser<IEnumerable<UpdateConfiguration>> ConnectionStringBuilder =
from first in Part
from rest in Parse.Char(';').Then(_ => Part).Many()
select Cons(first, rest);

All we have to do now is parse the connection string and apply the chain of update functions to a ConnectionConfiguration instance:

public IConnectionConfiguration Parse(string connectionString)
{
var updater = ConnectionStringGrammar.ConnectionStringBuilder.Parse(connectionString);
return updater.Aggregate(new ConnectionConfiguration(), (current, updateFunction) => updateFunction(current));
}

We get lots of nice things out of the box with Sprache, one of the best is the excellent error messages:

Parsing failure: unexpected 'x'; expected host or port or virtualHost or requestedHeartbeat or username or password (Line 1, Column 1).

Sprache is really nice for this kind of task. I’d recommend checking it out.

Tuesday, September 25, 2012

Replacing EasyNetQ Components

EasyNetQ, my simple .NET API for RabbitMQ, is a library composed of small components. Until today, the code simply wired the components in a messy hard-coded routine. Now it has its own tiny internal IoC container.  When you write:
var bus = RabbitHutch.CreateBus("host=localhost");

... the static method CreateBus registers the components with the container and then resolves the IBus instance. The really cool thing about this is that it allows you, the user, to replace any of the internal components, including IBus, with your own implementations. An overload of the CreateBus method provides the hook which gives you access to the component registration. The signature looks like this:
public static IBus CreateBus(string connectionString, Action<IServiceRegister> registerServices)

The IServiceRegister interface provides a single method:
public interface IServiceRegister
{
    IServiceRegister Register<TService>(Func<IServiceProvider, TService> serviceCreator) where TService : class;
}

So to register your own logger, based on IEasyNetQLogger, you'd write this code:
var logger = new MyLogger(); // MyLogger implements IEasyNetQLogger
var bus = RabbitHutch.CreateBus(connectionString, 
    serviceRegister => serviceRegister.Register(serviceProvider => logger));

The Register method's argument, Func<IServiceProvider, TService>, is a function that's run when CreateBus pulls together the components to make an IBus instance. IServiceProvider looks like this:
public interface IServiceProvider
{
    TService Resolve<TService>() where TService : class;
}

This allows you to access other services that EasyNetQ provides. If for example you wanted to replace the default serializer with your own implementation of ISerializer, and you wanted to construct it with a reference to the internal logger, you could do this:
var bus = RabbitHutch.CreateBus(connectionString, serviceRegister => serviceRegister.Register(
    serviceProvider => new MySerializer(serviceProvider.Resolve<IEasyNetQLogger>())));

There’s nothing to stop you registering your own interfaces with the container that you can then use with your implementations of EasyNetQ’s service interfaces.
To see the complete list of components that make up the IBus instance, and how they are assembled, take a look at the ComponentRegistration class.

Friday, September 14, 2012

Return a Task From BeginExecuteNonQuery

Blog as notepad time. Just a little reminder for myself on how to return the result from BeginExecuteNonQuery as a Task<int>

public Task<int> Save(string value)
{
var taskCompletionSource = new TaskCompletionSource<int>();

var connection = new SqlConnection(connectionString);
connection.Open();
var command = new SqlCommand("uspSaveSomeValue", connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@myparam", value);
command.BeginExecuteNonQuery(asyncResult =>
{
var result = command.EndExecuteNonQuery(asyncResult);
command.Dispose();
connection.Dispose();
taskCompletionSource.SetResult(result);
}, null);

return taskCompletionSource.Task;
}

If you know a better way, please comment below.

Yes, yes, I know, but I’m not working on 4.5 yet  :(

Update: Ken Egozi suggested using Task<int>.Factory.FromAsync. Of course! I’d been doing so much TaskCompletionSource manual task creation recently that I’d forgotten about this useful shortcut. Here’s a more succinct version using FromAsync:

public Task<int> Save(string value)
{
var connection = new SqlConnection(connectionString);
connection.Open();
var command = new SqlCommand("uspSaveSomeValue", connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@myparam", value);

return Task<int>.Factory.FromAsync(command.BeginExecuteNonQuery(), asyncResult =>
{
var result = command.EndExecuteNonQuery(asyncResult);
command.Dispose();
connection.Dispose();
return result;
});
}