A Runnable Scheduler for Reactive Extensions

Foreword

Reactive extensions is a framework that makes it easy to coordinate asynchronous streams of events.

Until now, I have been using Rx in « fat client » apps built using WPF, WinRT, or WinPhone framework. The support for those apps is great.

Using RX in ASP.NET MVC

Since I was designing a system built on ASP.NET MVC, I tought it would be nice to use some RX patterns.
At some point you might need to call some natives functions or access some related HttpContext objects. Those objects should be accessed on the thread provided by ASP.Net, a pattern known as thread affinity. The problem is how do you do such a thing.

In WPF/WinRT model, you would simply call the dispatcher or use a DispatcherScheduler provided by RX which does message pumping on the main thread. In the ASP.Net world, there is no such a thing.

Fortunately, Rx provide a message pump scheduler under the class EventLoopScheduler.

Wrapping the EventLoopScheduler

The EventLoopScheduler is already like a Dispatcher but without the priority. A single thread is used to execute all actions one at a time. By default, the EventLoopScheduler creates a new thread and used it to « run » the loop of actions. In our case, we will use the overloaded constructor to provide a thread factory.

Let’s creates a RunnableScheduler class :

public class RunnableScheduler : IScheduler, Idisposable { }

Put some required fields in it :

private readonly ManualResetEvent _resetEvent = new ManualResetEvent(false);
private readonly EventLoopScheduler _scheduler;
private ThreadStart _threadStart;

A constructor

public RunnableScheduler()
{
    _scheduler = new EventLoopScheduler(ts =>
    {
        _threadStart = ts;
        _resetEvent.Set();
        return new Thread(() => { });
    });
}

As you can see, the factory method will provide a new thread but this thread will not be used to run the actions loop. We will instead provide a Run method on our scheduler 

public void Run()
{
    _resetEvent.WaitOne();
    _threadStart();
}

And finally, the last thing that is required is a disposable method. This is required to release the thread from the Run method. In fact, the Run method does not returns until the scheduler is disposed.

public void Dispose()
{
    _scheduler.Schedule(_scheduler.Dispose);
    _resetEvent.Dispose();
}

Using the RunnableScheduler in ASP.NET MVC

Suppose you want to delay a greeting message for one second inside a Controller action. Here is what you could do

public class HomeController : Controller
{
    public ActionResult Index()
    {
        var runnableScheduler = new RunnableScheduler();
        var model = new MyModel();

        Observable
            .Timer(TimeSpan.FromSeconds(1), runnableScheduler)
            .Select(_ => "Hello John Smith")
            .Finally(runnableScheduler.Dispose)
            .Subscribe(greetings => model.Hello = greetings);

        runnableScheduler.Run();

        return View(model);
    }
}

The ASP.Net thread will block on the runnableScheduler.Run method. The Finally Rx operator will release the thread once the message is received.

Final Words

We have seen that it is be possible to use Rx inside ASP.Net and reuse the provided thread by running a special scheduler.

By extending this technique, I found that Rx could be really useful for calling web service and managing cancellation inside an ASP.Net Mvc controller. In a future post, I may show you more on this topic.


God bless you!

0 comments: