Monday, December 24, 2012

A Geek Christmas Quiz–The Answers!

Thanks for everyone who had a go at my Geek Christmas Quiz. The response was fantastic with both Iain Holder and Rob Pickering sending me emails of their answers. I’m pretty sure neither of them Googled any of the questions, since their scores weren’t spectacular :)

So, now the post you’ve all been waiting for with such anticipation… the answers!

Computers

  1. G.N.U stands for GNU is Not Unix. A recursive acronym, how geeky is that?
  2. The A in ARM originally stood for ‘Acorn’ as in Acorn Risc Machine. Yes, I know it stands for ‘Advanced’ now, but the question said ‘originally’.
  3. TCP stands for Transmission Control Protocol.
  4. Paul Allen founded Microsoft with Bill  Gates. I’ve just finished reading his memoirs ‘Ideas Man’. Hard work!
  5. F2 (hex) is 15(F) * 16 + 2 = 242.  1111 0010 (binary)
  6. Windows ME was based on the Balmer Peak theory of software development.
  7. The first programmer was Ada Lovelace. Yes yes, I know that’s contentious, but I made up this quiz, so there!
  8. UNIX time started in 1970 (1st January to be exact). I know this because I just had to write a System.DateTime to UNIX time converter.
  9. SGI, the mostly defunct computer maker. You get a mark for Silicon Graphics International (or Inc).
  10. Here’s monadic ‘Bind’ in C#: M<B> Bind<A,B>(M<A> a, Func<A, M<B>> func)

Name That Geek!

[Name_that_geek%255B4%255D.png]

  1. Bill Gates – Co-founder of Microsoft with Paul Allen.
  2. Tim Berners-Lee – Creator of the World Wide Web.
  3. Larry Ellison – Founder of Oracle. Lives in a Samurai House (how geeky is that?)
  4. Linus Torvalds – Creator of Linux.
  5. Alan Turing – Mathematician and computer scientist. Described the Turing Machine. Helped save the free world from the Nazis.
  6. Steve Jobs – Founded Apple, NeXT and Pixar.
  7. Natalie Portman – Actress and self confessed geek.

Science

  1. The four ‘letters’ of DNA are C, G, T and A. If you know the actual names of the nucleotides (guanine, adenine, thymine, and cytosine), give yourself a bonus point – you really are a DNA geek!
  2. The ‘c’ in E = mc2 is a constant, the speed of light.
  3. The next number in the Fibonacci sequence 1 1 2 3 5 8 is 13 (5 + 8).
  4. C8H10N402 is the chemical formula for caffeine.
  5. According to Wikipedia, Australopithecus, the early hominid, became extinct around 2 million years ago.
  6. You would not find an electron in an atomic nucleus.
  7. Nitrogen is the most common gas in the Earth’s atmosphere.
  8. The formula for Ohm’s Law is I = V/R (current = voltage / resistance).
  9. A piece of paper that, when folded in half, maintains the ratio between the length of its sides, has sides with a length ratio of 1.618, ‘the golden ratio’. Did you know that the ratio between successive Fibonacci sequence numbers also tends to the golden ratio? Maths is awesome!
  10. The closest living land mammal to the cetaceans (including whales) is the Hippopotamus.

Space

  1. The second (and third) stage of the Apollo Saturn V moon rocket was powered by five J2 rocket engines.
  2. Saturn’s largest moon is Titan. Also the only moon in the solar system (other than our own) that a spaceship has landed on.
  3. You would experience 1/6th of the Earth’s gravity on the moon. Or there about.
  4. This question proved most contentious. The answer is false, there is nowhere in space that has no gravity. Astronauts are weightless because they are in free-fall. Gravity itself is a property of space.
  5. A Geosynchronous spaceship has an orbital period of 24 hours. So it appears to be stationary to a ground observer.
  6. The furthest planet from the sun is Neptune. Far fewer people know this, than know that Pluto used to be the furthest planet from the sun. Actually Pluto was only the furthest for part of it’s, irregular, orbit.
  7. There are currently 6 people aboard the International Space Station.
  8. According to Google (yes, I know) there are 13,000 earth satellites.
  9. Prospero was the only satellite built and launched by the UK. It was launched by stealth after the programme had been cancelled, that’s the way we do things in the UK.
  10. The second man on the moon was Buzz Aldrin. He’s never forgiven NASA.

Name That Spaceship!

[Name_that_spaceship%255B4%255D.png]

In this round, give yourself a point if you can name the film or TV series the fictional spacecraft appeared in.

  1. Red Dwarf. Sorry, you probably have to be British to get this one.
  2. Space 1999. Sorry, you really have to be British and over 40 to get this one … or a major TV space geek.
  3. Voyager. Difficult, interplanetary probes all look similar.
  4. Apollo Lunar Excursion Module (LEM). You can have a point for ‘Lunar Module’, but no, you don’t get a point for ‘Apollo’. Call yourself a geek?
  5. Skylab. The first US space station, made out of old Apollo parts. Not many people get this one. A read a whole book about it, that’s how much of a space geek I am.
  6. Darth Vader’s TIE fighter. You can have a point for ‘TIE Fighter’. You can’t have a point for ‘Star Wars’. Yes yes, I know I’m contradicting myself, but, come on, every geek should know this.
  7. Curiosity. No, no points for ‘Mars Rover’.
  8. 2001 A Space Odyssey. Even I don’t know what the ship is called.
  9. Soyuz. It’s been used by the Russians to travel into space since 1966. 46 years! It’s almost as old as me. Odd, when space travel is so synonymous with high-technology, that much of the hardware is actually ancient.

Geek Culture

  1. ‘Spooky’ Mulder was the agent in the X-Files, played by actor David Duchovny.
  2. Kiki is the trainee witch in ‘Kiki’s Delivery Service’, one of my favourite anime movies by the outstanding Studio Ghibli.
  3. The actual quote: “Humans are a disease, a cancer of this planet.” by Agent Smith. You can have a point for Virus or Cancer too. Thanks Chris for the link and clarification.
  4. Spiderman of course!
  5. “It’s a Banana” Kryten, of Red Dwarf, learns to lie.
  6. My wife, who is Japanese, translates ‘Otaku’ as ‘geek’. Literally it means ‘you’ and is used to describe someone with obsessive interests. An appropriate question for a geek quiz I think.
  7. The name R2D2  apparently came about when Lucas heard someone ask for Reel 2 Dialog Track 2 in the abbreviated form ‘R-2-D-2’. Later it was said to stand for Second Generation Robotic Droid Series 2, you can have a point for either.
  8. Clarke’s 3rd law states: “Any sufficiently advanced technology is indistinguishable from magic.”
  9. African or European? From Monty Python’s Holy Grail.
  10. Open the pod bay doors please HAL. 2001 A Space Odyssey. Or on acid here.

So there you are. I hope you enjoyed it, and maybe even learnt a little. I certainly did. I might even do it again next year.

A very Merry Christmas to you all!

Friday, December 21, 2012

A Geek Christmas Quiz

God rest ye merry gentlemen! Welcome to my 2012 Geek Christmas Quiz. Every Friday morning at 15below we have a ‘DevEd’ session. Usually this is a presentation about some interesting tech, or a new way we want to do something at the company, but today I thought I would try to gauge the true geekiness of our development team with a quiz. The winners, and therefore crowned top geeks, were Toby and Linda who got a total of 32 points. See if you can do better dear reader.

You get one point for each correct answer. The quiz is split into six sections:  Computers, ‘Name That Geek’, Science, Space, ‘Name That Spaceship’, and Geek Culture.

Update: The answers are here.

Computers

  1. What does G.N.U. stand for?
  2. What did the A in ARM originally stand for?
  3. What does TCP stand for?
  4. Who founded Microsoft with Bill Gates?
  5. What is F2 (hexadecimal) in decimal?
  6. Which operating system's development was based on the 'Balmer Peak'?
  7. Who was the first programmer?
  8. What year does UNIX time start?
  9. What did SGI stand for?
  10. Write down the type signature of the Monadic Bind method.

Name that Geek

Name_that_geek

Science

  1. What are the four letters of DNA?
  2. What does the 'c' in E = mc2 stand for?
  3. What is the next number in this sequence: 1 1 2 3 5 8 _ ?
  4. What is C8 H10 N4 02 ?
  5. When did Australopithecus become extinct? (in millions of years ago)
  6. Which of the following would you not expect to find in an atomic nucleus (electron, neutron, proton)
  7. What is the most common gas in the Earth's atmosphere?
  8. Write the formula for Ohm's law.
  9. If, after you fold a piece of paper in half, the ratio between its longest side and its shortest side is the same, what is that ratio?
  10. What living land mammal is the closest evolutionary relative to Whales? (cetaceans)

Space

  1. What rocket engine powered the 2nd stage of the Saturn V?
  2. What is Saturn's largest moon?
  3. What fraction of the Earth's gravity would you experience on the moon?
  4. Astronauts are weightless in space because there is no gravity. true or false?
  5. What is the orbital period of a geosynchronous satellite?
  6. What is the furthest planet from the sun? (now that Pluto has been demoted)
  7. How many people are currently living aboard the ISS?
  8. To the nearest thousand, how many satellites are currently orbiting the earth?
  9. What was the name of the only satellite launched by the UK?
  10. Who was the second man on the moon?

Name that spaceship

Name_that_spaceship

Geek Culture

  1. Who was Spooky Mulder?
  2. Was Kiki a trainee witch or an evil princes?
  3. "Humans are a _____" (Agent Smith)
  4. Who is Peter Parker?
  5. "It's a b... It's a b... It's a small, off-duty Czechoslovakian traffic warden!" What is it really?
  6. What does 'Otaku' (Japanese) mean?
  7. What does R2D2 stand for?
  8. What is Clarke's 3rd law?
  9. What is the air speed velocity of an unladen swallow?
  10. Open ___ ___ ___ ____ please H.A.L

Tuesday, December 18, 2012

My Super Simple Node Twitter Re-Tweeter

I’ve been having a lot of fun writing a little ‘re-tweeter’ this morning. We basically want to monitor our user stream and then re-tweet any status with a particular hash tag. I thought this would be an excellent little project for node, and indeed it proved to be extremely easy to do. I used the node-twitter library which worked fine for what I wanted to do.

If you want to use this code, you’ll need to do the following:

First you’ll need to go to https://dev.twitter.com/apps and register a new app. You can then copy and paste your consumer key, consumer secret, access token and access token secret into the ‘xxx’ fields.

Next install node-twitter with npm:

npm install twitter

Then just run the code with node (I’m a poet and I didn’t know it):

node twitter-retweeter.js

Here’s the code in the twitter-retweeter.js file:

var util = require('util');
var twitter = require('twitter');

var twit = new twitter({
consumer_key: 'xxx',
consumer_secret: 'xxx',
access_token_key: 'xxx',
access_token_secret: 'xxx'
});

var hashtag = '#iloveprog'

function write(data) {
if ( typeof data === 'string') {
console.log(data);
}
else if (data.text && data.user && data.user.screen_name) {
console.log(data.user.screen_name + ": " + data.text);
testForHashtag(data);
}
else if (data.delete) {
console.log('DELETE');
}
else if (data.message) {
console.log('ERROR' + data.message);
}
else {
console.log(util.inspect(data));
}
}

function testForHashtag(data) {
if(data.retweeted) return;
if(data.text.indexOf(hashtag) != -1) {
twit.retweetStatus(data.id_str, function(){
console.log('retweet callback');
});
}
}

function reconnect() {
setTimeout(startStreaming, 1000);
}

function startStreaming() {
twit.stream('user', function(stream) {
console.log('starting stream');
stream.on('data', write);
stream.on('end', reconnect)
});
}

startStreaming();

console.log('lisening for tweets');

It’s all really straight forward. The startStreaming function kicks of the callback on the twitter user stream. Each time an event occurs it calls the write function which checks for the given hashtag and then retweets the status if there’s a match.

Lovely!

Wednesday, December 05, 2012

WebRequest Throws On 404 Status Code

WebRequest, or rather HttpWebRequest has the annoying behaviour or throwing a WebException when the server returns 404 ‘not found’ status, or in fact any unexpected status number. It would be much better if it didn’t do this and simply allowed the application to decide what it should do on different status codes. At the very least there should be some way of turning this behaviour on or off. In fact it would be nice if the whole WebRequest class wasn’t a monolithic black box, but a toolbox of components that allowed you to tailor an HTTP client to your requirements. I was a little surprised when I did some Googling earlier and couldn’t find a nice open source alternative to WebRequest; it’s the sort of thing that the community is usually quite good at coding around. Oh well, I’ll add that to my ever growing list of potential future GitHub projects (that will never happen).

My quick and dirty fix for this problem was an extension method that catches WebException, checks if the type of the exception is a protocol exception – this seems to be the status for status code exceptions, and then returns the response from the exception’s Response property. It’s horrible, but it seems to work:

public static HttpWebResponse GetHttpResponse(this HttpWebRequest request)
{
HttpWebResponse response = null;

try
{
response = (HttpWebResponse)request.GetResponse();
}
catch (WebException exception)
{
if (exception.Status == WebExceptionStatus.ProtocolError)
{
response = (HttpWebResponse) exception.Response;
}
else
{
throw;
}
}

return response;
}

It conveniently returns an HttpWebResponse instead of a WebResponse.

You could use it like this …

var response = request.GetHttpResponse();
if (response.StatusCode != HttpStatusCode.NotFound)
{
// handle appropriately
}

Of course, if you want to handle the response asynchronously, you’ll have to write an extension method for EndGetResponse as well.

… or, if you’re using .NET 4.5, you could use HttpClient. It wraps WebRequest internally, but it does return status codes rather than throwing it seems.

Tuesday, December 04, 2012

The Onion Of Compromise

yellow_onion

I love little phrases that sum up large scale behaviours in software systems and the organisations that produce them. One of my favourite is “The Onion Of Compromise.” I first heard this gem from my excellent friend Iain Holder. Iain doesn’t claim to be the author, that honour goes to a mysterious third person named ‘Mike’.

Being a programmer is all about making decisions. Lots and lots of little decisions. In fact every line of code is a decision; a little cog in the wheel of a grander machine. The simple thing that separates a good programmer from a poor programmer is that they tend to make relatively more good decisions and less bad ones.

Incidentally, that’s why it’s a mistake to think that you can hire an experienced ‘chief architect’ who ‘designs’ your system, while rooms full of junior/cheap developers churn out the code - and expect anything other than a disaster to occur. The decisions are just too granular to be  made by one person on a large project.

Good decisions are ones which aid the evolution and stability of an application. They are summed up by epithets that describe general good practice, such as ‘Don’t Repeat Yourself’, ‘Open Closed Principle’ and a hundred others. An experienced programmer will employ a range of these rules-of-thumb to ensure that they don’t get tangled up in needless complexity as their application grows. You can tell a project where good decisions have been made; it’s easy to add new features and there are few bugs.

A bad decision often doesn’t seem like a bad decision at first, merely a way of implementing a feature or fixing a bug with the least possible impact on the code. Often the bad decision will introduce a constraint on further evolution of the software or a special case given a particular combination of factors. If a bad decision isn’t rolled back it can quickly lead to further bad decisions as the programmer works around it. Soon layers of poor design wrap that initial poor decision. This is ‘The Onion of Compromise’. That initial first mistake (or compromise) leads to a cascade of poor choices. Another name for the layers of the onion is ‘Technical Debt’.

It’s easy to spot software that has suffered from The Onion of Compromise; it’s brittle, you change one thing and it breaks seemingly unrelated parts of the system; it seems to take ages to implement new features; and there’s a high bug count.

Monday, November 26, 2012

RabbitMQ On Windows With .NET, A Case Study

Any reader of this blog will know that my big project over the last year has been to create a simple .NET API for RabbitMQ called EasyNetQ.  I’ve been working as a software architect at 15Below for the last year and a half. The prime motivation for writing EasyNetQ was so that our developers would have an easy API for working with RabbitMQ on .NET. I was very fortunate that, founder and technical director, John Clynes, supported my wish to build it as an open source library. I originally wrote this post for the VMWare blog as a case study of running RabbitMQ in a Microsoft environment.
15Below is based in Brighton on the south coast of England, famous for it’s Regency pavilion and Victorian pier. We provide messaging and integration services for the travel industry. Our clients include Ryanair, Qantas, JetBlue, Thomas Cook and around 30 other airline and rail customers. We send hundreds of millions of transactional notifications each year to our customer’s passengers.
RabbitMQ has helped us to significantly simplify and stabilise our software. It’s one of those black boxes that you install, configure, and then really don’t have to worry about. In over a year of production we’ve found it to be extremely stable and reliable.
Prior to introducing RabbitMQ our applications would use SQL Server as a queuing mechanism. Each task would be represented by a row in a workflow table. Each process in the workflow would poll the table looking for rows that matched its status, process the rows in in a batch, and then update the rows’ status field for the next process to pick up. Each step in the process would be hosted by an application service that implemented its own threading model, often using a different approach to all the other services. This created highly coupled software, with workflow steps and infrastructure concerns, such as threading and load balancing, mixed together with business logic. We also discovered that a relational database is not a natural fit for a queuing system. The contention on the workflow tables is high, with constant inserts, selects and updates causing locking issues. Deleting completed items is also problematic on highly indexed tables and we had considerable problems with continuously growing tables.
I wrote about the ‘Database As Queue Anti-Pattern’ in a previous post in more detail.
RabbitMQ provides a number of features that helped us overcome these problems. Firstly it is designed from the beginning as a high-performance messaging platform. It easily outperformed our SQL Server based solution with none of its locking or deletion problems. Rabbit’s event-oriented messaging model also takes away much of the need for complex multi-threaded batch processing code that was previously a cause of instability in our systems.
We first introduced RabbitMQ about 18 months ago as the core infrastructure behind our Flight Status product. We wanted a high performance messaging product with a proven track record that supported common messaging patterns, such as publish/subscribe and request/response. A further requirement was that it should provide automatic work distribution and load balancing.
The need to support messaging patterns ruled out simple store-and-forward queues such as MSMQ and ActiveMQ. We were very impressed by ZeroMQ, but decided that we really needed the centralised manageability of a broker based product. This left RabbitMQ. Although support for AMQP, an open messaging standard, wasn’t in our list of requirements, its implementation by RabbitMQ made us more confident that we were choosing a sustainable strategy.
We are very much a Microsoft shop, so we had some initial concerns about RabbitMQ’s performance and stability on Windows. We were reassured by reading some accounts of RabbitMQ’s and indeed Erlang’s use on Windows by organisations with some very impressive load requirements. Subsequent experience has borne these reports out, and we have found RabbitMQ on Server 2008 to be rock solid.
As a Microsoft shop, our development platform is .NET. Although VMWare provide an AMQP C# client, it is a low-level API, not suitable for use by more junior developers. For this reason we created our own high-level .NET API for RabbitMQ that provides simple single method access to common messaging patterns and does not require a deep knowledge of AMQP.  This API is called EasyNetQ. We’ve open sourced it and, with over 3000 downloads, it is now the leading high-level API for RabbitMQ with .NET. You can find more information about it at EasyNetQ.com. We would recommend looking at it if you are a .NET shop using RabbitMQ.
15Below’s Flight-Status product provides real-time flight information to passengers and their family and friends. We interface with the airline’s real-time flight information stream generated from their operation systems and provide a platform that allows them to apply complex business logic to this stream. We render customer tailored output, and communicate with the airline’s customers via a range of channels, including email, SMS, voice and iPhone/Android push. RabbitMQ allows us to build each piece; the client for the fight information stream, the message renderer, the sink channels and the business logic; as separate components that communicate using structured messages. Our architecture looks something like this:
rabbit_based_architecture
The green boxes are our core product systems, the blue boxes represent custom code that we write for each customer. A ‘customer saga’ is code that models a long-running business process and includes all the workflow logic for a particular customer’s flight information requirements. A ‘core product service’ is an independent service that implements a feature of our product. An example would be the service that takes flight information and combines it with a customer defined template to create an email to be sent to a passenger. Constructing services as independently deployable and runnable applications gives us great flexibility and scalability. If we need to scale up a particular component, we simply install more copies. RabbitMQ’s automatic work sharing feature means that we can do this without any reconfiguration of existing components. This architecture also makes it easy to test each application service in isolation since it’s simply a question of firing messages at the service and watching its response.
In conclusion, RabbitMQ has provided a rock solid piece of infrastructure with the features to allow us to significantly reduce the architectural complexity of our systems. We can now build software for our clients faster and more reliably. It scales to higher loads than our previous relational-database based systems and is more flexible in the face of changing customer requirements.

Friday, November 23, 2012

Using BlockingCollection To Communicate Between Threads

Consider these (somewhat) common programming challenges:

  • I’m using a third party library that is not thread safe, but I want my application to share work between multiple threads. How do I marshal calls between my multi-threaded code to the single threaded library?
  • I have a single source of events on a single thread, but I want to share the work between a pool of multiple threads?
  • I have multiple threads emitting events, but I want to consume them on a single thread?

One way of doing this would be to have some shared state, a field or a property on a static class, and wrap locks around it so that multiple threads can access it safely. This is a pretty common way of trying to skin this particular cat, but it’s shot through with traps for the unwary. Also, it can hurt performance because access to the shared resource is serialized, even though the things accessing it are running in parallel.

A better way is to use a BlockingCollection and have your threads communicate via message classes.

BlockingCollection is a class in the new System.Collections.Concurrent namespace that arrived with .NET 4.0. It contains a ConcurrentQueue, although you can swap this for a ConcurrentStack or a ConcurrentBag if you want. You push objects in at one end and sit in a loop consuming them from the other. The (multiple) producer(s) and (multiple) consumer(s) can be running on different threads without any locks. That’s OK because the Concurrent namespace collection classes are guaranteed to be thread safe. The ‘blocking’ part of the name is there because the consuming end blocks until an object is available. Justin Etheredge has an excellent post that looks at BlockingCollection in more detail here.

For an example, let’s implement a parallel pipeline. A ventilator produces tasks to be processed in parallel, a set of workers process the tasks on separate threads, and a sink collects the results back together again. It shows both one-to-many and many-to-one thread communication. I’ve stolen the idea and the diagram from the excellent ZeroMQ Guide:

zguide_parallel_workers

First we’ll need a class that represents a piece of work, we’ll keep it super simple for this example:

public class WorkItem
{
public string Text { get; set; }
}

We’ll need two BlockingCollections, one to take the tasks from the ventilator to the workers, and another to take the finished work from the workers to the sink:

var ventilatorQueue = new BlockingCollection<WorkItem>();
var sinkQueue = new BlockingCollection<WorkItem>();

Now let’s write our ventilator:

public static void StartVentilator(BlockingCollection<WorkItem> ventilatorQueue)
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 100; i++)
{
ventilatorQueue.Add(new WorkItem { Text = string.Format("Item {0}", i) });
}
}, TaskCreationOptions.LongRunning);
}

It just iterates 100 times creating work items and pushing them on the ventilatorQueue.

Here is a worker:

public static void StartWorker(int workerNumber,
BlockingCollection<WorkItem> ventilatorQueue,
BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in ventilatorQueue.GetConsumingEnumerable())
{
// pretend to take some time to process
Thread.Sleep(30);
workItem.Text = workItem.Text + " processed by worker " + workerNumber;
sinkQueue.Add(workItem);
}
}, TaskCreationOptions.LongRunning);
}

BlockingCollection provides a GetConsumingEnumerable method that yields each item in turn. It blocks if there are no items on the queue. Note that I’m not worrying about shutdown patterns in this code. In production code you’d need to worry about how to close down your worker threads.

Next let’s write our sink:

public static void StartSink(BlockingCollection<WorkItem> sinkQueue)
{
Task.Factory.StartNew(() =>
{
foreach (var workItem in sinkQueue.GetConsumingEnumerable())
{
Console.WriteLine("Processed Messsage: {0}", workItem.Text);
}
}, TaskCreationOptions.LongRunning);
}

Once again, this sits in an infinite foreach loop consuming items from the sinkQueue.

Finally we need to wire up the pieces and kick it off:

StartSink(sinkQueue);

StartWorker(0, ventilatorQueue, sinkQueue);
StartWorker(1, ventilatorQueue, sinkQueue);
StartWorker(2, ventilatorQueue, sinkQueue);

StartVentilator(ventilatorQueue);

I’ve started the sink first, then the workers and finally the producer. It doesn’t overly matter what order they start in since the queues will store any tasks the ventilator creates before the workers and the sink start.

Running the code I get output something like this:

Processed Messsage: Item 1 processed by worker 1
Processed Messsage: Item 2 processed by worker 0
Processed Messsage: Item 0 processed by worker 2
Processed Messsage: Item 5 processed by worker 2
Processed Messsage: Item 3 processed by worker 1

....

Processed Messsage: Item 95 processed by worker 0
Processed Messsage: Item 98 processed by worker 0
Processed Messsage: Item 97 processed by worker 2
Processed Messsage: Item 96 processed by worker 1
Processed Messsage: Item 99 processed by worker 0

This pattern is a great way of decoupling the communication between a source and a sink, or a producer and a consumer. It also allows you to have multiple sources and multiple sinks, but primarily it’s a safe way for multiple threads to interact.

The complete example is here on GitHub.

Thursday, November 15, 2012

A C# .NET Client Proxy For The RabbitMQ Management API

RabbitMQ comes with a very nice Management UI and a HTTP JSON API, that allows you to configure and monitor your RabbitMQ broker. From the website:

The rabbitmq-management plugin provides an HTTP-based API for management and monitoring of your RabbitMQ server, along with a browser-based UI and a command line tool, rabbitmqadmin. Features include:

  • Declare, list and delete exchanges, queues, bindings, users, virtual hosts and permissions.
  • Monitor queue length, message rates globally and per channel, data rates per connection, etc.
  • Send and receive messages.
  • Monitor Erlang processes, file descriptors, memory use.
  • Export / import object definitions to JSON.
  • Force close connections, purge queues.

Wouldn’t it be cool if you could do all these management tasks from your .NET code? Well now you can. I’ve just added a new project to EasyNetQ called EasyNetQ.Management.Client. This is a .NET client-side proxy for the HTTP-based API.

It’s on NuGet, so to install it, you simply run:

PM> Install-Package EasyNetQ.Management.Client

To give an overview of the sort of things you can do with EasyNetQ.Client.Management, have a look at this code. It first creates a new Virtual Host and a User, and gives the User permissions on the Virtual Host. Then it re-connects as the new user, creates an exchange and a queue, binds them, and publishes a message to the exchange. Finally it gets the first message from the queue and outputs it to the console.

var initial = new ManagementClient("http://localhost", "guest", "guest");

// first create a new virtual host
var vhost = initial.CreateVirtualHost("my_virtual_host");

// next create a user for that virutal host
var user = initial.CreateUser(new UserInfo("mike", "topSecret"));

// give the new user all permissions on the virtual host
initial.CreatePermission(new PermissionInfo(user, vhost));

// now log in again as the new user
var management = new ManagementClient("http://localhost", user.name, "topSecret");

// test that everything's OK
management.IsAlive(vhost);

// create an exchange
var exchange = management.CreateExchange(new ExchangeInfo("my_exchagne", "direct"), vhost);

// create a queue
var queue = management.CreateQueue(new QueueInfo("my_queue"), vhost);

// bind the exchange to the queue
management.CreateBinding(exchange, queue, new BindingInfo("my_routing_key"));

// publish a test message
management.Publish(exchange, new PublishInfo("my_routing_key", "Hello World!"));

// get any messages on the queue
var messages = management.GetMessagesFromQueue(queue, new GetMessagesCriteria(1, false));

foreach (var message in messages)
{
Console.Out.WriteLine("message.payload = {0}", message.payload);
}

This library is also ideal for monitoring queue levels, channels and connections on your RabbitMQ broker. For example, this code prints out details of all the current connections to the RabbitMQ broker:

var connections = managementClient.GetConnections();

foreach (var connection in connections)
{
Console.Out.WriteLine("connection.name = {0}", connection.name);
Console.WriteLine("user:\t{0}", connection.client_properties.user);
Console.WriteLine("application:\t{0}", connection.client_properties.application);
Console.WriteLine("client_api:\t{0}", connection.client_properties.client_api);
Console.WriteLine("application_location:\t{0}", connection.client_properties.application_location);
Console.WriteLine("connected:\t{0}", connection.client_properties.connected);
Console.WriteLine("easynetq_version:\t{0}", connection.client_properties.easynetq_version);
Console.WriteLine("machine_name:\t{0}", connection.client_properties.machine_name);
}

On my machine, with one consumer running it outputs this:
 
connection.name = [::1]:64754 -> [::1]:5672
user: guest
application: EasyNetQ.Tests.Performance.Consumer.exe
client_api: EasyNetQ
application_location: D:\Source\EasyNetQ\Source\EasyNetQ.Tests.Performance.Consumer\bin\Debug
connected: 14/11/2012 15:06:19
easynetq_version: 0.9.0.0
machine_name: THOMAS

You can see the name of the application that’s making the connection, the machine it’s running on and even its location on disk. That’s rather nice. From this information it wouldn’t be too hard to auto-generate a complete system diagram of your distributed messaging application. Now there’s an idea :)

For more information, check out the documentation.

Monday, November 12, 2012

Nicer Client Properties For EasyNetQ

EasyNetQ is my lightweight easy-to-use .NET API for RabbitMQ.

Today I added a small but very nice feature, better client properties. Now when you look at connections created by EasyNetQ you can see the machine that connected, the application and the application’s location on disk. It also gives you the date and time that EasyNetQ first connected. Very useful for debugging.

Here’s an example. Check out the ‘Client Properties’ section.

nice_connection_properties

Tuesday, November 06, 2012

EasyNetQ Publisher Confirms

EasyNetQ is my easy-to-use .NET API for RabbitMQ.

The default AMQP publish is not transactional and doesn't guarantee that your message will actually reach the broker. AMQP does specify a transactional publish, but with RabbitMQ it is extremely slow, around 200 slower than a non-transactional publish, and we haven't supported it via the EasyNetQ API. For high-performance guaranteed delivery it's recommended that you use 'Publisher Confirms'. Simply speaking, this an extension to AMQP that provides a call-back when your message has been successfully received by the broker.

What does 'successfully received' mean? It depends ...

  • A transient message is confirmed the moment it is enqueued.
  • A persistent message is confirmed as soon as it is persisted to disk, or when it is consumed on every queue.
  • An unroutable transient message is confirmed directly it is published.

For more information on publisher confirms, please read the announcement on the RabbitMQ blog.

To use publisher confirms, you must first create the publish channel with publisher confirms on:

var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms())

Next you must specify success and failure callbacks when you publish your message:

channel.Publish(message, x => 
x.OnSuccess(() =>
{
// do success processing here
})
.OnFailure(() =>
{
// do failure processing here
}));

Be careful not to dispose the publish channel before your call-backs have had a chance to execute.

Here's an example of a simple test. We're publishing 10,000 messages and then waiting for them all to be acknowledged before disposing the channel. There's a timeout, so if the batch takes longer than 10 seconds we abort with an exception.

const int batchSize = 10000;
var callbackCount = 0;
var stopwatch = new Stopwatch();
stopwatch.Start();

using (var channel = bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
{
for (int i = 0; i < batchSize; i++)
{
var message = new MyMessage {Text = string.Format("Hello Message {0}", i)};
channel.Publish(message, x =>
x.OnSuccess(() => {
callbackCount++;
})
.OnFailure(() =>
{
callbackCount++;
}));
}

// wait until all the publications have been acknowleged.
while (callbackCount < batchSize)
{
if (stopwatch.Elapsed.Seconds > 10)
{
throw new ApplicationException("Aborted batch with timeout");
}
Thread.Sleep(10);
}
}

Wednesday, October 10, 2012

A Functional IoC Container

Today I was idly thinking about an idea I had a couple of years ago for a functional IoC container. I’d had a go at implementing such a beast, but soon got bogged down in a tangled mess of spaghetti reflection code and gave it up as too much bother. But today it suddenly occurred  to me that there was no need for any reflection voodoo; the C# type system is powerful enough to do all the work for us.

In object oriented programming languages we build programs from classes. Classes declare the contract(s) they support with interfaces and declare their dependencies with constructor arguments. We use an IoC container to wire instances of our classes together to make a running program.

Pure functional languages, like Haskell, don’t have any concept of class, instead they use currying and partial application to compose hierarchies of functions.

Here’s an example of a purely functional program written in C#.

public static class Module
{
public static Data GetAndTransform(Func<Input,Data> dataAccsessor, Func<Data,Data> transformer, int id)
{
var input = new Input() {Id = id};
var data = dataAccsessor(input);
var transformed = transformer(data);
return transformed;
}

public static Data DataAccsessor(Input input)
{
return new Data
{
Id = input.Id,
Name = "Test"
};
}

public static Data Transformer(Data original)
{
original.Name = original.Name + " transformed";
return original;
}
}

GetAndTransform simply takes an int id argument, does some work, and then returns some data. It needs a dataAccsessor and a transformer in order to do its job.

C# doesn’t support currying or partial application, so in order to run it we have to compose the program and execute it all in one step. For example:

var id = 10;
var data = Module.GetAndTransform(Module.DataAccsessor, Module.Transformer, id);

Console.Out.WriteLine("data.Id = {0}", data.Id);
Console.Out.WriteLine("data.Name = {0}", data.Name);

But what if we had a ‘currying container’, one that could compose the program in one step and then return a function for us to execute in another? Here is such a container at work with our program:

var registration = new Container()
.Register<Func<Input, Data>, Func<Data, Data>, int, Data>(Module.GetAndTransform)
.Register<Input,Data>(Module.DataAccsessor)
.Register<Data,Data>(Module.Transformer);

var main = registration.Get<Func<int, Data>>();

var data = main(10);

Console.Out.WriteLine("data.Id = {0}", data.Id);
Console.Out.WriteLine("data.Name = {0}", data.Name);

In the first line, we create a new instance of our container. On the next three lines we register our functions. Unfortunately C#’s type inference isn’t powerful enough to let us do away with the tedious type annotations; we have to explicitly declare the argument and return types of each of our functions.

Once our functions are registered we can ask the container for a program (main) that takes an int and returns a Data instance. The container works out that it needs to curry GetAndTransform and then partially apply DataAccsessor and Transformer to it to produce the desired function.

We can then run our ‘main’ function which gives us our expected output:

data.Id = 10
data.Name = Test transformed

The container turns out to be very simple, just a dictionary that’s keyed by type and contains a collection of constructor functions that know how to build the target (key) type.

public interface IRegistration
{
void Add(Type target, Func<object> constructor);
T Get<T>();
}

public class Container : IRegistration
{
private readonly Dictionary<Type, Func<object>> registrations = new Dictionary<Type, Func<object>>();

public void Add(Type target, Func<object> constructor)
{
registrations.Add(target, constructor);
}

public T Get<T>()
{
return (T)registrations[typeof (T)]();
}
}

The magic sauce is in the Registration function overloads. If you take the standard functional idea that a function should only have one argument and one return type, you can take any input function, curry it, and then partially apply arguments until you are left with a Func<X,Y>. So you know what the ‘target’ type of each function should be, a function from the last argument to the return type. A Func<A,B,C,R> gets resolved to a Func<C,R>. There’s no need to explicitly register a target, it’s implicit from the type of the provided function:

public static class RegistrationExtensions
{
public static IRegistration Register<A,R>(this IRegistration registration, Func<A, R> source)
{
var targetType = typeof (Func<A, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried);

return registration;
}

public static IRegistration Register<A,B,R>(this IRegistration registration, Func<A, B, R> source)
{
var targetType = typeof (Func<B, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried(
registration.Get<A>()
));

return registration;
}

public static IRegistration Register<A, B, C, R>(this IRegistration registration, Func<A, B, C, R> source)
{
var targetType = typeof(Func<C, R>);
var curried = Functional.Curry(source);

registration.Add(targetType, () => curried(
registration.Get<A>()
)
(
registration.Get<B>()
));

return registration;
}
}

Each overload deals with an input function with a different number of arguments. My simple experiment only works with functions with up to three arguments (two dependencies and an input type), but it would be easy to extend for higher numbers. The Curry function is stolen from Oliver Sturm and looks like this:

public static class Functional
{
public static Func<A, R> Curry<A, R>(Func<A, R> input)
{
return input;
}

public static Func<A, Func<B, R>> Curry<A, B, R>(Func<A, B, R> input)
{
return a => b => input(a, b);
}

public static Func<A, Func<B, Func<C,R>>> Curry<A, B, C, R>(Func<A, B, C, R> input)
{
return a => b => c => input(a, b, c);
}
}

Rather nice, even if I say so myself.

Of course this little experiment has many limitations. For a start it only understands functions in terms of Func< … >, so you can’t have more than one function of each ‘type’. You couldn’t have two Func<int,int> for example, which might be somewhat limiting.

The code is on GitHub here if you want to have a play.

Wednesday, October 03, 2012

EasyNetQ Cluster Support

EasyNetQ, my super simple .NET API for RabbitMQ, now (from version 0.7.2.34) supports RabbitMQ clusters without any need to deploy a load balancer.
Simply list the nodes of the cluster in the connection string ...
var bus = RabbitHutch.CreateBus("host=ubuntu:5672,ubuntu:5673");
In this example I have set up a cluster on a single machine, 'ubuntu', with node 1 on port 5672 and node 2 on port 5673. When the CreateBus statement executes, EasyNetQ will attempt to connect to the first host listed (ubuntu:5672). If it fails to connect it will attempt to connect to the second host listed (ubuntu:5673). If neither node is available it will sit in a re-try loop attempting to connect to both servers every five seconds. It logs all this activity to the registered IEasyNetQLogger. You might see something like this if the first node was unavailable:
DEBUG: Trying to connect
ERROR: Failed to connect to Broker: 'ubuntu', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable'
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/'
If the node that EasyNetQ is connected to fails, EasyNetQ will attempt to connect to the next listed node. Once connected, it will re-declare all the exchanges and queues and re-start all the consumers. Here's an example log record showing one node failing then EasyNetQ connecting to the other node and recreating the subscribers:
INFO: Disconnected from RabbitMQ Broker
DEBUG: Trying to connect
DEBUG: OnConnected event fired
DEBUG: Re-creating subscribers
INFO: Connected to RabbitMQ. Broker: 'ubuntu', Port: 5674, VHost: '/'
You get automatic fail-over out of the box. That’s pretty cool.
If you have multiple services using EasyNetQ to connect to a RabbitMQ cluster, they will all initially connect to the first listed node in their respective connection strings. For this reason the EasyNetQ cluster support is not really suitable for load balancing high throughput systems. I would recommend that you use a dedicated hardware or software load balancer instead, if that’s what you want.

Friday, September 28, 2012

Parsing a Connection String With Sprache

Sprache is a very cool lightweight parser library for C#. Today I was experimenting with parsing EasyNetQ connection strings, so I thought I’d have a go at getting Sprache to do it. An EasyNetQ connection string is a list of key-value pairs like this:

key1=value1;key2=value2;key3=value3

The motivation for looking at something more sophisticated than simply chopping strings based on delimiters, is that I’m thinking of having more complex values that would themselves need parsing. But that’s for the future, today I’m just going to parse a simple connection string where the values can be strings or numbers (ushort to be exact).

So, I want to parse a connection string that looks like this:

virtualHost=Copa;username=Copa;host=192.168.1.1;password=abc_xyz;port=12345;requestedHeartbeat=3

… into a strongly typed structure like this:

public class ConnectionConfiguration : IConnectionConfiguration
{
public string Host { get; set; }
public ushort Port { get; set; }
public string VirtualHost { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public ushort RequestedHeartbeat { get; set; }
}

I want it to be as easy as possible to add new connection string items.

First let’s define a name for a function that updates a ConnectionConfiguration. A uncommonly used version of the ‘using’ statement allows us to give a short name to a complex type:

using UpdateConfiguration = Func<ConnectionConfiguration, ConnectionConfiguration>;

Now lets define a little function that creates a Sprache parser for a key value pair. We supply the key and a parser for the value and get back a parser that can update the ConnectionConfiguration.

public static Parser<UpdateConfiguration> BuildKeyValueParser<T>(
string keyName,
Parser<T> valueParser,
Expression<Func<ConnectionConfiguration, T>> getter)
{
return
from key in Parse.String(keyName).Token()
from separator in Parse.Char('=')
from value in valueParser
select (Func<ConnectionConfiguration, ConnectionConfiguration>)(c =>
{
CreateSetter(getter)(c, value);
return c;
});
}

The CreateSetter is a little function that turns a property expression (like x => x.Name) into an Action<TTarget, TProperty>.

Next let’s define parsers for string and number values:

public static Parser<string> Text = Parse.CharExcept(';').Many().Text();
public static Parser<ushort> Number = Parse.Number.Select(ushort.Parse);

Now we can chain a series of BuildKeyValueParser invocations and Or them together so that we can parse any of our expected key-values:

public static Parser<UpdateConfiguration> Part = new List<Parser<UpdateConfiguration>>
{
BuildKeyValueParser("host", Text, c => c.Host),
BuildKeyValueParser("port", Number, c => c.Port),
BuildKeyValueParser("virtualHost", Text, c => c.VirtualHost),
BuildKeyValueParser("requestedHeartbeat", Number, c => c.RequestedHeartbeat),
BuildKeyValueParser("username", Text, c => c.UserName),
BuildKeyValueParser("password", Text, c => c.Password),
}.Aggregate((a, b) => a.Or(b));

Each invocation of BuildKeyValueParser defines an expected key-value pair of our connection string. We just give the key name, the parser that understands the value, and the property on ConnectionConfiguration that we want to update. In effect we’ve defined a little DSL for connection strings. If I want to add a new connection string value, I simply add a new property to ConnectionConfiguration and a single line to the above code.

Now lets define a parser for the entire string, by saying that we’ll parse any number of key-value parts:

public static Parser<IEnumerable<UpdateConfiguration>> ConnectionStringBuilder =
from first in Part
from rest in Parse.Char(';').Then(_ => Part).Many()
select Cons(first, rest);

All we have to do now is parse the connection string and apply the chain of update functions to a ConnectionConfiguration instance:

public IConnectionConfiguration Parse(string connectionString)
{
var updater = ConnectionStringGrammar.ConnectionStringBuilder.Parse(connectionString);
return updater.Aggregate(new ConnectionConfiguration(), (current, updateFunction) => updateFunction(current));
}

We get lots of nice things out of the box with Sprache, one of the best is the excellent error messages:

Parsing failure: unexpected 'x'; expected host or port or virtualHost or requestedHeartbeat or username or password (Line 1, Column 1).

Sprache is really nice for this kind of task. I’d recommend checking it out.

Tuesday, September 25, 2012

Replacing EasyNetQ Components

EasyNetQ, my simple .NET API for RabbitMQ, is a library composed of small components. Until today, the code simply wired the components in a messy hard-coded routine. Now it has its own tiny internal IoC container.  When you write:
var bus = RabbitHutch.CreateBus("host=localhost");

... the static method CreateBus registers the components with the container and then resolves the IBus instance. The really cool thing about this is that it allows you, the user, to replace any of the internal components, including IBus, with your own implementations. An overload of the CreateBus method provides the hook which gives you access to the component registration. The signature looks like this:
public static IBus CreateBus(string connectionString, Action<IServiceRegister> registerServices)

The IServiceRegister interface provides a single method:
public interface IServiceRegister
{
    IServiceRegister Register<TService>(Func<IServiceProvider, TService> serviceCreator) where TService : class;
}

So to register your own logger, based on IEasyNetQLogger, you'd write this code:
var logger = new MyLogger(); // MyLogger implements IEasyNetQLogger
var bus = RabbitHutch.CreateBus(connectionString, 
    serviceRegister => serviceRegister.Register(serviceProvider => logger));

The Register method's argument, Func<IServiceProvider, TService>, is a function that's run when CreateBus pulls together the components to make an IBus instance. IServiceProvider looks like this:
public interface IServiceProvider
{
    TService Resolve<TService>() where TService : class;
}

This allows you to access other services that EasyNetQ provides. If for example you wanted to replace the default serializer with your own implementation of ISerializer, and you wanted to construct it with a reference to the internal logger, you could do this:
var bus = RabbitHutch.CreateBus(connectionString, serviceRegister => serviceRegister.Register(
    serviceProvider => new MySerializer(serviceProvider.Resolve<IEasyNetQLogger>())));

There’s nothing to stop you registering your own interfaces with the container that you can then use with your implementations of EasyNetQ’s service interfaces.
To see the complete list of components that make up the IBus instance, and how they are assembled, take a look at the ComponentRegistration class.

Friday, September 14, 2012

Return a Task From BeginExecuteNonQuery

Blog as notepad time. Just a little reminder for myself on how to return the result from BeginExecuteNonQuery as a Task<int>

public Task<int> Save(string value)
{
var taskCompletionSource = new TaskCompletionSource<int>();

var connection = new SqlConnection(connectionString);
connection.Open();
var command = new SqlCommand("uspSaveSomeValue", connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@myparam", value);
command.BeginExecuteNonQuery(asyncResult =>
{
var result = command.EndExecuteNonQuery(asyncResult);
command.Dispose();
connection.Dispose();
taskCompletionSource.SetResult(result);
}, null);

return taskCompletionSource.Task;
}

If you know a better way, please comment below.

Yes, yes, I know, but I’m not working on 4.5 yet  :(

Update: Ken Egozi suggested using Task<int>.Factory.FromAsync. Of course! I’d been doing so much TaskCompletionSource manual task creation recently that I’d forgotten about this useful shortcut. Here’s a more succinct version using FromAsync:

public Task<int> Save(string value)
{
var connection = new SqlConnection(connectionString);
connection.Open();
var command = new SqlCommand("uspSaveSomeValue", connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@myparam", value);

return Task<int>.Factory.FromAsync(command.BeginExecuteNonQuery(), asyncResult =>
{
var result = command.EndExecuteNonQuery(asyncResult);
command.Dispose();
connection.Dispose();
return result;
});
}