A Runnable Scheduler for Reactive Extensions

Foreword

Reactive extensions is a framework that makes it easy to coordinate asynchronous streams of events.

Until now, I have been using Rx in « fat client » apps built using WPF, WinRT, or WinPhone framework. The support for those apps is great.

Using RX in ASP.NET MVC

Since I was designing a system built on ASP.NET MVC, I tought it would be nice to use some RX patterns.
At some point you might need to call some natives functions or access some related HttpContext objects. Those objects should be accessed on the thread provided by ASP.Net, a pattern known as thread affinity. The problem is how do you do such a thing.

In WPF/WinRT model, you would simply call the dispatcher or use a DispatcherScheduler provided by RX which does message pumping on the main thread. In the ASP.Net world, there is no such a thing.

Fortunately, Rx provide a message pump scheduler under the class EventLoopScheduler.

Wrapping the EventLoopScheduler

The EventLoopScheduler is already like a Dispatcher but without the priority. A single thread is used to execute all actions one at a time. By default, the EventLoopScheduler creates a new thread and used it to « run » the loop of actions. In our case, we will use the overloaded constructor to provide a thread factory.

Let’s creates a RunnableScheduler class :

public class RunnableScheduler : IScheduler, Idisposable { }

Put some required fields in it :

private readonly ManualResetEvent _resetEvent = new ManualResetEvent(false);
private readonly EventLoopScheduler _scheduler;
private ThreadStart _threadStart;

A constructor

public RunnableScheduler()
{
    _scheduler = new EventLoopScheduler(ts =>
    {
        _threadStart = ts;
        _resetEvent.Set();
        return new Thread(() => { });
    });
}

As you can see, the factory method will provide a new thread but this thread will not be used to run the actions loop. We will instead provide a Run method on our scheduler 

public void Run()
{
    _resetEvent.WaitOne();
    _threadStart();
}

And finally, the last thing that is required is a disposable method. This is required to release the thread from the Run method. In fact, the Run method does not returns until the scheduler is disposed.

public void Dispose()
{
    _scheduler.Schedule(_scheduler.Dispose);
    _resetEvent.Dispose();
}

Using the RunnableScheduler in ASP.NET MVC

Suppose you want to delay a greeting message for one second inside a Controller action. Here is what you could do

public class HomeController : Controller
{
    public ActionResult Index()
    {
        var runnableScheduler = new RunnableScheduler();
        var model = new MyModel();

        Observable
            .Timer(TimeSpan.FromSeconds(1), runnableScheduler)
            .Select(_ => "Hello John Smith")
            .Finally(runnableScheduler.Dispose)
            .Subscribe(greetings => model.Hello = greetings);

        runnableScheduler.Run();

        return View(model);
    }
}

The ASP.Net thread will block on the runnableScheduler.Run method. The Finally Rx operator will release the thread once the message is received.

Final Words

We have seen that it is be possible to use Rx inside ASP.Net and reuse the provided thread by running a special scheduler.

By extending this technique, I found that Rx could be really useful for calling web service and managing cancellation inside an ASP.Net Mvc controller. In a future post, I may show you more on this topic.


God bless you!

Task And Cancellation


Foreword

C# 5 has integrated the Parallel Framework directly into the language. Those who are familiar with tasks and ContinueWith knows how much pain it was prior to the apparition of the async / await keywords.
Having async and await keywords allow us to easily produce code with control flow, try catch that simplify greatly the code.

One scenario tought that is missing love is Cancellation. Here is a sample code that does not support cancellation :



static void Main(string[] args)
{
    WriteInConsole();
    Console.ReadLine();
    Console.WriteLine("Key Press!");
    Console.ReadLine();
}

private static async void WriteInConsole()
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        await Task.Delay(1000);
        Console.WriteLine("Task running {0}", i);
    }
}

Will output:

Task running 0
Task running 1
Task running 2
Key Press!
Task running 3
Task running 4
...

When you hit returns, the task running the WriteInConsole method will continue to work. This is fine since it really show that the async is working correctly and another thread is working. Now, let’s add cancellation support.

Cancelling a Task with a CancellationToken

The prescribed way of cancelling a task is by specifying a CancellationToken to the task when it is created. A CancellationToken is a ValueType that can be passed to algorithms in order to federated the Cancellation of a bunch of operations.
A CancellationToken does not contains a Cancel method. Tto create a CancellationToken, you must first create a CancellationTokenSource that will issue a CancellationToken.
Here is the code that supports cancellation :

static void Main(string[] args)
{
    CancellationTokenSource tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;

    Task.Run(() => WriteInConsole(token), token);

    Console.ReadLine();
           
    tokenSource.Cancel();
    Console.WriteLine("Key press!");
           
    Console.ReadLine();
}

private static void WriteInConsole(CancellationToken token)
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        Task.Delay(1000).Wait(token);
        Console.WriteLine("Task running {0}", i);
    }
}

Will output :

Task running 0
Task running 1
Task running 2
Key Press!

When you hit returns, the task running WriteInConsole method will stop working as well as the Wait inside the WriteInConsole method. In fact, the Task.Delay(1000).Wait(token) will produce an OperationCancelledException to end the loop automatically.
Notice that the code contains reference to a CancellationTokenSource and requires to pass a CancellationToken to all the algorithm in order to correctly stop the threads.
Let’s see how we can improve things a bit.

Removing the CancellationToken noise

We will remove all CancellationTokenSource and CancellationToken from the code so we can focus on the stuff that matters. Here is what the program should looks like :

static void Main(string[] args)
{
    var t = Tasks.StartWithCancellation(WriteInConsole);
    Console.ReadLine();
           
    t.Cancel();

    Console.WriteLine("Key press!");           
    Console.ReadLine();
}

private static void WriteInConsole()
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        Task.Delay(1000).WaitWithCancellation();
        Console.WriteLine("Task running {0}", i);
    }
}
As you can see, we remove all the noise.  Looking at the code, you will see that instead of using Task.Run, we use Tasks.StartWithCancellation because we want to create the task with a CancellationToken.

Diving deep into the bits...

Create a custom Task Factory method

We will create a task factory method for supporting cancellation and an ambient context. Here is what the code looks like :

public static Task StartWithCancellation(Action action)
{
    var source = new CancellationTokenSource();
    var token = source.Token;

    if (_ambientToken.CanBeCanceled)
    {
        token = CancellationTokenSource
            .CreateLinkedTokenSource(token, _ambientToken)
            .Token;
    }

    Action a = () =>
    {
        _ambientToken = token;
        try
        {
            action();
        }
        finally
        {
            _ambientToken = CancellationToken.None;
        }
    };

    var task = Task.Factory.StartNew(a, token);

    _sources.Add(task, source);
           
    return task;
}

First important thing here is that we are using an ambientToken. The ambient token is a static field declared using the attribute ThreadStatic.

[ThreadStatic]
private static CancellationToken _ambientToken;

This is usefull because it gives us a contextual CancellationToken relative to the current thread so we can « enlist » more task or Wait using the same propagated CancellationToken.
Another important thing here is that we need to allow fluent cancellation of the task. To make sure we can do this we need to attach a CancellationTokenSource to the Task. Know, starting with .Net 4.0, there is a very neat way of doinig this by using a ConditionalWeakTable.

private static readonly ConditionalWeakTable<Task, CancellationTokenSource> _sources = new ConditionalWeakTable<Task, CancellationTokenSource>();

Here is how the we link the CancellationTokenSource to the Task :

_sources.Add(task, source);

Add a Cancel method on the Task

By using the ConditionalWeakTable, we can find attached to the task the CancellationTokenSource and cancel the task. Instead of deriving the Task and create a method on the class, we will use an extension method  to add a Cancel operation to the task.

public static void Cancel(this Task task)
{
    var cancellationTokenSource = _sources.GetValue(task, _ => null);
    if (cancellationTokenSource != null)
        cancellationTokenSource.Cancel();
}

Wait with ambient cancellation

The last method we need to add is a Wait that can automatically cancel waiting based on our ambient CancellationToken. We do this with the following code :

public static void WaitWithCancellation(this Task task)
{
    task.Wait(_ambientToken);
}

Final Words

As you can see, we can do something that help improve the Task Cancellation scenario. I really wish that better cancellation supports will come in the next language or .net version.
Happy coding.