Monday, April 22, 2013

Stop Your Console App The Nice Way

When you write a console application, do you simply put

Console.WriteLine("Hit <enter> to end");
Console.ReadLine();

at the end of Main and block on Console.ReadLine()?

It’s much nicer to to make your console application work with Windows command line conventions and exit when the user types Control-C.

The Console class has a static event CancelKeyPress that fires when Ctrl-C is pressed. You can create an AutoResetEvent that blocks until you call Set in the CancelKeyPress handler.

It’s also nice to stop the application from being arbitrarily killed at the point where Ctrl-C is pressed by setting the EventArgs.Cancel property to true. This gives you a chance to complete what you are doing and exit the application cleanly.

Here’s an example. My little console application kicks off a worker thread and then blocks waiting for Ctrl-C as described above. When Ctrl-C is pressed I send a signal to the worker thread telling it to finish what it’s doing and exit.

using System;
using System.Threading;

namespace Mike.Spikes.ConsoleShutdown
{
class Program
{
private static bool cancel = false;

static void Main(string[] args)
{
Console.WriteLine("Application has started. Ctrl-C to end");

// do some cool stuff here
var myThread = new Thread(Worker);
myThread.Start();

var autoResetEvent = new AutoResetEvent(false);
Console.CancelKeyPress += (sender, eventArgs) =>
{
// cancel the cancellation to allow the program to shutdown cleanly
eventArgs.Cancel = true;
autoResetEvent.Set();
};

// main blocks here waiting for ctrl-C
autoResetEvent.WaitOne();
cancel = true;
Console.WriteLine("Now shutting down");
}

private static void Worker()
{
while (!cancel)
{
Console.WriteLine("Worker is working");
Thread.Sleep(1000);
}
Console.WriteLine("Worker thread ending");
}
}
}

Much nicer I think.

Thursday, April 18, 2013

Serializing Lua Coroutines With Pluto

In my last post I showed how it’s possible to use Lua as a distributed workflow scripting language because of its build in support for coroutines. But in order to create viable long-running workflows we have to have some way of saving the script’s state at the point that it pauses; we need a way to serialize the coroutine.

Enter Pluto:

Pluto is a heavy duty persistence library for Lua. Pluto is a library which allows users to write arbitrarily large portions of the "Lua universe" into a flat file, and later read them back into the same or a different Lua universe.

I downloaded the Win32 build of Tamed Pluto from here and placed it in a directory alongside my Pluto.dll. You have to tell Lua where to find C libraries by setting the Lua package.cpath:

using (var lua = new Lua())
{
lua["package.cpath"] = @"D:\Source\Mike.DistributedLua\Lua\?.dll";

...
}

Lua can now find the Pluto library and we can import it into our script with:

require('pluto')

Here’s a script which defines a function foo which calls a function bar. Foo is used to create a coroutine. Inside bar the coroutine yields. The script uses Pluto to serialize the yielded coroutine and saves it to a file.

-- import pluto, print out the version number 
-- and set non-human binary serialization scheme.
require('pluto')
print('pluto version '..pluto.version())
pluto.human(false)

-- perms are items to be substituted at serialization
perms = { [coroutine.yield] = 1 }

-- the functions that we want to execute as a coroutine
function foo()
local someMessage = 'And hello from a long dead variable!'
local i = 4
bar(someMessage)
print(i)
end

function bar(msg)
print('entered bar')
-- bar runs to here then yields
coroutine.yield()
print(msg)
end

-- create and start the coroutine
co = coroutine.create(foo)
coroutine.resume(co)

-- the coroutine has now stopped at yield. so we can
-- persist its state with pluto
buf = pluto.persist(perms, co)

-- save the serialized state to a file
outfile = io.open(persistCRPath, 'wb')
outfile:write(buf)
outfile:close()

This next script loads the serialized coroutine from disk, deserializes it, and runs it from the point that it yielded:

-- import pluto, print out the version number 
-- and set non-human binary serialization scheme.
require('pluto')
print('pluto version '..pluto.version())
pluto.human(false)

-- perms are items to be substituted at serialization
-- (reverse the key/value pair that you used to serialize)
perms = { [1] = coroutine.yield }

-- get the serialized coroutine from disk
infile, err = io.open(persistCRPath, 'rb')
if infile == nil then
error('While opening: ' .. (err or 'no error'))
end

buf, err = infile:read('*a')
if buf == nil then
error('While reading: ' .. (err or 'no error'))
end

infile:close()

-- deserialize it
co = pluto.unpersist(perms, buf)

-- and run it
coroutine.resume(co)

When we run the scripts, the first prints out ‘entered bar’ and then yields:

LUA> pluto version Tamed Pluto 1.0
LUA> entered bar

The second script loads the paused ‘foo’ and ‘bar’ and continues from the yield:

LUA> pluto version Tamed Pluto 1.0
LUA> And hello from a long dead variable!
LUA> 4

Having the ability to run a script to the point at which it starts a long running call, serialize its state and store it somewhere, then pick up that serialized state and resume it at another place and time is a very powerful capability. It means we can write simple procedural scripts for our workflows, but have them execute over a distributed service oriented architecture.

The complete code for this experiment is on GitHub here.

Thursday, April 04, 2013

Lua as a Distributed Workflow Scripting Language

I’ve been spending a lot of time recently thinking about ways of orchestrating long running workflows in a service oriented architecture. I was talking this over at last Tuesday’s Brighton ALT NET when Jay Kannan, who’s a game developer amongst other things, mentioned that Lua is a popular choice for scripting game platforms. Maybe I should check it out. So I did. And it turns out to be very interesting.

If you haven’t heard of Lua, it’s a “powerful, fast, lightweight, embeddable scripting language” originally conceived by a team at the Catholic University of Rio de Janeiro in Brazil. It’s the leading scripting language for game platforms and also pops up in other interesting locations including Photoshop and Wikipedia. It’s got a straightforward C API that makes it relatively simple to p-invoke from .NET, and indeed there’s a LuaInterface library that provides a managed API.

I got the source code from the Google code svn repository and built it in VS 2012, but there are NuGet packages available as well.

It turned out to be very simple to use Lua to script a distributed workflow. Lua has first class coroutines which means that you can pause and continue a Lua script at will. The LuaInterface library allows you inject C# functions and call them as Lua functions, so it’s simply a case of calling an asynchronous C# ‘begin’ function, suspending the script by yielding the coroutine, waiting for the asynchronous function to return, setting the return value, and starting up the script again.

Let me show you how.

First here’s a little Lua script:

a = 5
b = 6

print('doing remote add ...')

r1 = remoteAdd(a, b)

print('doing remote multiply ...')

r2 = remoteMultiply(r1, 4)

print('doing remote divide ...')

r3 = remoteDivide(r2, 2)

print(r3)

The three functions ‘remoteAdd’, ‘remoteMultiply’ and ‘remoteDivide’ are all asynchronous. Behind the scenes a message is sent via RabbitMQ to a remote OperationServer where the calculation is carried out and a message is returned.

The script runs in my LuaRuntime class. This creates and sets up the Lua environment that the script runs in:

public class LuaRuntime : IDisposable
{
private readonly Lua lua = new Lua();
private readonly Functions functions = new Functions();

public LuaRuntime()
{
lua.RegisterFunction("print", functions, typeof(Functions).GetMethod("Print"));
lua.RegisterFunction("startOperation", this, GetType().GetMethod("StartOperation"));

lua.DoString(
@"
function remoteAdd(a, b) return remoteOperation(a, b, '+'); end
function remoteMultiply(a, b) return remoteOperation(a, b, '*'); end
function remoteDivide(a, b) return remoteOperation(a, b, '/'); end

function remoteOperation(a, b, op)
startOperation(a, b, op)
local cor = coroutine.running()
coroutine.yield(cor)

return LUA_RUNTIME_OPERATION_RESULT
end
"
);
}

public void StartOperation(int a, int b, string operation)
{
functions.RunOperation(a, b, operation, result =>
{
lua["LUA_RUNTIME_OPERATION_RESULT"] = result;
lua.DoString("coroutine.resume(co)");
});
}

public void Execute(string script)
{
const string coroutineWrapper =
@"co = coroutine.create(function()
{0}
end)"
;
lua.DoString(string.Format(coroutineWrapper, script));
lua.DoString("coroutine.resume(co)");
}

public void Dispose()
{
lua.Dispose();
functions.Dispose();
}
}

When this class is instantiated it creates a new LuaInterface environment (the Lua class) and a new instance of a Functions class that I’ll explain below.

The constructor is where most of the interesting setup happens. First we register two C# functions that we want to call from inside Lua: ‘print’ which simply prints from the console, and ‘startOperation’ which starts an asynchronous math operation.

Next we define our three functions: ‘remoteAdd’, ‘remoteMultiply’ and ‘remoteDivide’ which all in turn invoke a common function ‘remoteOperation’. RemoteOperation calls the registered C# function ‘startOperation’ then yields the currently running coroutine. Effectively the script will stop here until it’s started again. After it starts, the result of the asynchronous operation is accessed from the  LUA_RUNTIME_OPERATION_RESULT variable and returned to the caller.

The C# function StartOperation calls RunOperation on our Functions class which has an asynchronous callback. In the callback we set the result value in the Lua environment and execute ‘coroutine.resume’ which restarts the Lua script at the point where it yielded.

The Execute function actually runs the script. First it embeds it in a ‘coroutine.create’ call so that the entire script is created as a coroutine, then it simply starts the coroutine by calling ‘coroutine.resume’.

The Functions class is just a wrapper around a function that maintains an EasyNetQ connection to RabbitMQ and makes an EasyNetQ request to a remote server somewhere else on the network.

public class Functions : IDisposable
{
private readonly IBus bus;

public Functions()
{
bus = RabbitHutch.CreateBus("host=localhost");
}

public void Dispose()
{
bus.Dispose();
}

public void RunOperation(int a, int b, string operation, Action<int> resultCallback)
{
using (var channel = bus.OpenPublishChannel())
{
var request = new OperationRequest()
{
A = a,
B = b,
Operation = operation
};
channel.Request<OperationRequest, OperationResponse>(request, response =>
{
Console.WriteLine("Got response {0}", response.Result);
resultCallback(response.Result);
});
}
}

public void Print(string msg)
{
Console.WriteLine("LUA> {0}", msg);
}
}
 
Here’s a sample run of the script:
 
DEBUG: Trying to connect
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'localhost', Port: 5672, VHost: '/'
LUA> doing remote add ...
DEBUG: Declared Consumer. queue='easynetq.response.143441ff-3635-4d5d-8e42-6b379b3f8356', prefetchcount=50
DEBUG: Published to exchange: 'easy_net_q_rpc', routing key: 'Mike_DistributedLua_Messages_OperationRequest:Mike_DistributedLua_Messages', correlationId: '50560dd9-2be1-49a1-96f6-9c62641080ae'
DEBUG: Recieved
RoutingKey: 'easynetq.response.143441ff-3635-4d5d-8e42-6b379b3f8356'
CorrelationId: '50560dd9-2be1-49a1-96f6-9c62641080ae'
ConsumerTag: '101343d9-9497-4893-88e6-b89cc1de29a4'
Got response 11
LUA> doing remote multiply ...
DEBUG: Declared Consumer. queue='easynetq.response.f571f6d7-b963-4a88-bf62-f05785009e39', prefetchcount=50
DEBUG: Published to exchange: 'easy_net_q_rpc', routing key: 'Mike_DistributedLua_Messages_OperationRequest:Mike_DistributedLua_Messages', correlationId: '0ea7e1c3-6f12-4cb9-a861-2f5de8f2600d'
DEBUG: Model Shutdown for queue: 'easynetq.response.143441ff-3635-4d5d-8e42-6b379b3f8356'
DEBUG: Recieved
RoutingKey: 'easynetq.response.f571f6d7-b963-4a88-bf62-f05785009e39'
CorrelationId: '0ea7e1c3-6f12-4cb9-a861-2f5de8f2600d'
ConsumerTag: '2c35f24e-7745-4475-885a-d214a1446a70'
Got response 44
LUA> doing remote divide ...
DEBUG: Declared Consumer. queue='easynetq.response.060f7882-685c-4b00-a930-aa4f20f7c057', prefetchcount=50
DEBUG: Published to exchange: 'easy_net_q_rpc', routing key: 'Mike_DistributedLua_Messages_OperationRequest:Mike_DistributedLua_Messages', correlationId: 'ea9a90cc-cd7d-4f05-b171-c6849026ac4a'
DEBUG: Model Shutdown for queue: 'easynetq.response.f571f6d7-b963-4a88-bf62-f05785009e39'
DEBUG: Recieved
RoutingKey: 'easynetq.response.060f7882-685c-4b00-a930-aa4f20f7c057'
CorrelationId: 'ea9a90cc-cd7d-4f05-b171-c6849026ac4a'
ConsumerTag: '90e6b024-c5c4-440a-abdf-cb9a000c131c'
Got response 22
LUA> 22
DEBUG: Model Shutdown for queue: 'easynetq.response.060f7882-685c-4b00-a930-aa4f20f7c057'
Completed
DEBUG: Connection disposed

You can see the Lua print statements interleaved with EasyNetQ DEBUG statements showing the messages being published and consumed.
 
So there you go, a distributed workflow scripting engine in under 100 lines of code. All I’ve got to do now is serialize the Lua environment at each yield and then restart it again from its serialized state. This is possible according to a bit of googling yesterday afternoon. Watch this space.
 
You can find the code for all this on GitHub here: