Friday, November 05, 2010

Composing Sequential Tasks With Linq

This is a continuation of my Task Parallel Library investigations. Yesterday I wrote about using TPL with MVC.

Say we have a number of asynchronous tasks that we want to execute in series because the result of the first task is an input value to the second task, and the result of the second task is an input to the third. To demonstrate, I’ve created two simple task generators:

static Task<int> CreateInt(int a)
{
    return Task<int>.Factory.StartNew(() =>
    {
        Console.WriteLine("Starting CreateInt   {0}", a);
        Thread.Sleep(1000);
        Console.WriteLine("Completing CreateInt {0}", a);
        return a;
    });
}

static Task<int> AddInts(int a, int b)
{
    return Task<int>.Factory.StartNew(() =>
    {
        Console.WriteLine("Starting AddInts     {0} + {1}", a, b);
        Thread.Sleep(1000);
        Console.WriteLine("Completing AddInts   {0} + {1}", a, b);
        return a + b;
    });
}

I want to create an int and then add 3 to it, and then add 4. It’s difficult to compose these using the standard ‘ContinueWith’ callback:

public void ComposeWithContinueWith()
{
    var result = CreateInt(2)
        .ContinueWith(t1 => AddInts(t1.Result, 3)
            .ContinueWith(t2 => AddInts(t2.Result, 4))
            );

    // result is the first task, how do you get the third task's result?
}

You can simply put two lots of ‘Unwrap()’ at the end of the expression:

public void ComposeWithContinueWith() { var result = CreateInt(2) .ContinueWith(t1 => AddInts(t1.Result, 3) .ContinueWith(t2 => AddInts(t2.Result, 4)) ).Unwrap().Unwrap();

Console.WriteLine("Completed with result {0}", result.Result); }

But there is a much nicer way. But because tasks are Monadic, you can compose them using Linq:

Update / Correction: The out-of-the-box Task<T> doesn’t have Linq methods (SelectMany etc) built in.

However, there is an implementation in the ParallelExtensionsExtras assembly. I confused myself (it happens a lot) because I’d included the ParallelExtensionsExtras for the Task extension methods on SmtpClient and SqlDataReader, and has simply assumed that Task<T> has the Linq extension methods built in.

You can build the ParallelExtensionsExtras.dll yourself from the Samples for Parallel Programming. Alternatively, you can just grab a compiled ParallelExtensionsExtras.dll from my sample solution at: https://github.com/mikehadlow/Suteki.AsyncMvcTpl. Stephen Toub has a great write up on the goodies in the Parallel Extensions Extras library here, it’s a great read.

Anyway, so once you have a reference to ParallelExtensionsExtras, you can compose Tasks using Linq expressions:

public void CanComposeTasksWithLinq()
{
    var result = from a in CreateInt(2)
                 from b in AddInts(a, 3)
                 from c in AddInts(b, 4)
                 select c;

    Console.WriteLine("Completed with result {0}", result.Result);
}

Which outputs:

Starting CreateInt   2
Completing CreateInt 2
Starting AddInts     2 + 3
Completing AddInts   2 + 3
Starting AddInts     5 + 4
Completing AddInts   5 + 4
Completed with result 9

This is a really nice pattern to use if you have a number of async IO tasks to do in series.

4 comments:

Ricardo Peres said...

Doesn't compile:

'System.Collections.Generic.IEnumerable' does not contain a definition for 'Result' and no extension method 'Result' accepting a first argument of type 'System.Collections.Generic.IEnumerable' could be found (are you missing a using directive or an assembly reference?)

Could not find an implementation of the query pattern for source type 'System.Threading.Tasks.Task'. 'SelectMany' not found.

Mike Hadlow said...

Hi rjperes,

You are using .NET 4.0 right?

You can get the complete solution with this code in it here:
https://github.com/mikehadlow/Suteki.AsyncMvcTpl

The class with this example is here:
https://github.com/mikehadlow/Suteki.AsyncMvcTpl/blob/master/Suteki.AsyncMvcTpl/Spikes/TaskAndLinq.cs

Ricardo Peres said...

Hello, Mike, thanks for the quick reply!
Yes, I'm using .NET 4.0 (full profile), it doesn't compile. Perhaps there is some assembly missing?
Thanks,

Ricardo Peres said...

Found it! It's the ParallelExtensionsExtras that were missing. Thanks for your help!
R