A Runnable Scheduler for Reactive Extensions

Foreword

Reactive extensions is a framework that makes it easy to coordinate asynchronous streams of events.

Until now, I have been using Rx in « fat client » apps built using WPF, WinRT, or WinPhone framework. The support for those apps is great.

Using RX in ASP.NET MVC

Since I was designing a system built on ASP.NET MVC, I tought it would be nice to use some RX patterns.
At some point you might need to call some natives functions or access some related HttpContext objects. Those objects should be accessed on the thread provided by ASP.Net, a pattern known as thread affinity. The problem is how do you do such a thing.

In WPF/WinRT model, you would simply call the dispatcher or use a DispatcherScheduler provided by RX which does message pumping on the main thread. In the ASP.Net world, there is no such a thing.

Fortunately, Rx provide a message pump scheduler under the class EventLoopScheduler.

Wrapping the EventLoopScheduler

The EventLoopScheduler is already like a Dispatcher but without the priority. A single thread is used to execute all actions one at a time. By default, the EventLoopScheduler creates a new thread and used it to « run » the loop of actions. In our case, we will use the overloaded constructor to provide a thread factory.

Let’s creates a RunnableScheduler class :

public class RunnableScheduler : IScheduler, Idisposable { }

Put some required fields in it :

private readonly ManualResetEvent _resetEvent = new ManualResetEvent(false);
private readonly EventLoopScheduler _scheduler;
private ThreadStart _threadStart;

A constructor

public RunnableScheduler()
{
    _scheduler = new EventLoopScheduler(ts =>
    {
        _threadStart = ts;
        _resetEvent.Set();
        return new Thread(() => { });
    });
}

As you can see, the factory method will provide a new thread but this thread will not be used to run the actions loop. We will instead provide a Run method on our scheduler 

public void Run()
{
    _resetEvent.WaitOne();
    _threadStart();
}

And finally, the last thing that is required is a disposable method. This is required to release the thread from the Run method. In fact, the Run method does not returns until the scheduler is disposed.

public void Dispose()
{
    _scheduler.Schedule(_scheduler.Dispose);
    _resetEvent.Dispose();
}

Using the RunnableScheduler in ASP.NET MVC

Suppose you want to delay a greeting message for one second inside a Controller action. Here is what you could do

public class HomeController : Controller
{
    public ActionResult Index()
    {
        var runnableScheduler = new RunnableScheduler();
        var model = new MyModel();

        Observable
            .Timer(TimeSpan.FromSeconds(1), runnableScheduler)
            .Select(_ => "Hello John Smith")
            .Finally(runnableScheduler.Dispose)
            .Subscribe(greetings => model.Hello = greetings);

        runnableScheduler.Run();

        return View(model);
    }
}

The ASP.Net thread will block on the runnableScheduler.Run method. The Finally Rx operator will release the thread once the message is received.

Final Words

We have seen that it is be possible to use Rx inside ASP.Net and reuse the provided thread by running a special scheduler.

By extending this technique, I found that Rx could be really useful for calling web service and managing cancellation inside an ASP.Net Mvc controller. In a future post, I may show you more on this topic.


God bless you!

Task And Cancellation


Foreword

C# 5 has integrated the Parallel Framework directly into the language. Those who are familiar with tasks and ContinueWith knows how much pain it was prior to the apparition of the async / await keywords.
Having async and await keywords allow us to easily produce code with control flow, try catch that simplify greatly the code.

One scenario tought that is missing love is Cancellation. Here is a sample code that does not support cancellation :



static void Main(string[] args)
{
    WriteInConsole();
    Console.ReadLine();
    Console.WriteLine("Key Press!");
    Console.ReadLine();
}

private static async void WriteInConsole()
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        await Task.Delay(1000);
        Console.WriteLine("Task running {0}", i);
    }
}

Will output:

Task running 0
Task running 1
Task running 2
Key Press!
Task running 3
Task running 4
...

When you hit returns, the task running the WriteInConsole method will continue to work. This is fine since it really show that the async is working correctly and another thread is working. Now, let’s add cancellation support.

Cancelling a Task with a CancellationToken

The prescribed way of cancelling a task is by specifying a CancellationToken to the task when it is created. A CancellationToken is a ValueType that can be passed to algorithms in order to federated the Cancellation of a bunch of operations.
A CancellationToken does not contains a Cancel method. Tto create a CancellationToken, you must first create a CancellationTokenSource that will issue a CancellationToken.
Here is the code that supports cancellation :

static void Main(string[] args)
{
    CancellationTokenSource tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;

    Task.Run(() => WriteInConsole(token), token);

    Console.ReadLine();
           
    tokenSource.Cancel();
    Console.WriteLine("Key press!");
           
    Console.ReadLine();
}

private static void WriteInConsole(CancellationToken token)
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        Task.Delay(1000).Wait(token);
        Console.WriteLine("Task running {0}", i);
    }
}

Will output :

Task running 0
Task running 1
Task running 2
Key Press!

When you hit returns, the task running WriteInConsole method will stop working as well as the Wait inside the WriteInConsole method. In fact, the Task.Delay(1000).Wait(token) will produce an OperationCancelledException to end the loop automatically.
Notice that the code contains reference to a CancellationTokenSource and requires to pass a CancellationToken to all the algorithm in order to correctly stop the threads.
Let’s see how we can improve things a bit.

Removing the CancellationToken noise

We will remove all CancellationTokenSource and CancellationToken from the code so we can focus on the stuff that matters. Here is what the program should looks like :

static void Main(string[] args)
{
    var t = Tasks.StartWithCancellation(WriteInConsole);
    Console.ReadLine();
           
    t.Cancel();

    Console.WriteLine("Key press!");           
    Console.ReadLine();
}

private static void WriteInConsole()
{
    for (var i = 0; i < int.MaxValue; i++)
    {
        Task.Delay(1000).WaitWithCancellation();
        Console.WriteLine("Task running {0}", i);
    }
}
As you can see, we remove all the noise.  Looking at the code, you will see that instead of using Task.Run, we use Tasks.StartWithCancellation because we want to create the task with a CancellationToken.

Diving deep into the bits...

Create a custom Task Factory method

We will create a task factory method for supporting cancellation and an ambient context. Here is what the code looks like :

public static Task StartWithCancellation(Action action)
{
    var source = new CancellationTokenSource();
    var token = source.Token;

    if (_ambientToken.CanBeCanceled)
    {
        token = CancellationTokenSource
            .CreateLinkedTokenSource(token, _ambientToken)
            .Token;
    }

    Action a = () =>
    {
        _ambientToken = token;
        try
        {
            action();
        }
        finally
        {
            _ambientToken = CancellationToken.None;
        }
    };

    var task = Task.Factory.StartNew(a, token);

    _sources.Add(task, source);
           
    return task;
}

First important thing here is that we are using an ambientToken. The ambient token is a static field declared using the attribute ThreadStatic.

[ThreadStatic]
private static CancellationToken _ambientToken;

This is usefull because it gives us a contextual CancellationToken relative to the current thread so we can « enlist » more task or Wait using the same propagated CancellationToken.
Another important thing here is that we need to allow fluent cancellation of the task. To make sure we can do this we need to attach a CancellationTokenSource to the Task. Know, starting with .Net 4.0, there is a very neat way of doinig this by using a ConditionalWeakTable.

private static readonly ConditionalWeakTable<Task, CancellationTokenSource> _sources = new ConditionalWeakTable<Task, CancellationTokenSource>();

Here is how the we link the CancellationTokenSource to the Task :

_sources.Add(task, source);

Add a Cancel method on the Task

By using the ConditionalWeakTable, we can find attached to the task the CancellationTokenSource and cancel the task. Instead of deriving the Task and create a method on the class, we will use an extension method  to add a Cancel operation to the task.

public static void Cancel(this Task task)
{
    var cancellationTokenSource = _sources.GetValue(task, _ => null);
    if (cancellationTokenSource != null)
        cancellationTokenSource.Cancel();
}

Wait with ambient cancellation

The last method we need to add is a Wait that can automatically cancel waiting based on our ambient CancellationToken. We do this with the following code :

public static void WaitWithCancellation(this Task task)
{
    task.Wait(_ambientToken);
}

Final Words

As you can see, we can do something that help improve the Task Cancellation scenario. I really wish that better cancellation supports will come in the next language or .net version.
Happy coding.




Client Databases for Silverlight

When comes the time to use Silverlight out of browser functionality, we may have to face the fact that Silverlight might be used without an internet connection. In this particular case, the application must implement a local cache.

I have found some very interesting database solution available for Silverlight.

siaqodb (in beta when this post was published)

This database is using LINQ natively to retrieve the data which is stored in the isolated storage on the client machine.

var p = new Product { Name = "Pencil", Price = 0.25 };

db.StoreObject(product);

var query = from Product p in db
            where p.Name == "Pencil"
            select p;

http://siaqodb.com/

Silverlight Database (in beta when this post was published)

Again, this database is using the isolated storage to persist locally the offline data.

var db = Database.CreateDatabase("test");
db.CreateTable<Person>();

var person = new Person{ FirstName = "John", LastName = "Doe" };
db.Table<Person>().Add(person);

var q = from p in db.Table<Person>()
        where p.LastName == "Doe"
        select p;

http://silverdb.codeplex.com/

Others

db4o : They are working on a Silverlight port.
EffiProzSL : A port of the EffiProz database.
Perst : A port from Java.

Experimentations

Google gears and silverlight
C#-SQLite running in Silverlight

Happy new year!

WCF RIA Services without code using a TriggerAction

When you drop a WCF RIA service data source in the visual studio 2010 Beta 2, you will see that it generates for you a load button and assign it an event handler that call Load on the ria data source.

I don’t know for you but I find easier to use a TriggerAction than having to write the same code again and again.

In this post, I will discuss some TriggerAction that I have implemented and helps me write RIA application faster.

LoadAction

Loading data using a data source in RIA services is very simple. You only have to call Load. Why not ease that by making it available in Microsoft Expression Blend 3 using a TriggerAction.

Here is the implementation of the LoadAction that targets a DomainDataSource.

public class LoadAction : TargetedTriggerAction<DomainDataSource>
{
    protected override void Invoke(object parameter)
    {
        var target = Target;
        if (target != null && !target.IsLoadingData)
            target.Load();
    }
}

Wow! That was easy. 3 lines of code and we have our first action. Have a try in Blend and you will see that it really helps.

SubmitChangesAction

Submit data is also very simple. Here is how to do it using a TriggerAction:

public class SubmitChangesAction : TargetedTriggerAction<DomainDataSource>
{
    protected override void Invoke(object parameter)
    {
        var target = Target;
        if (target != null)
            target.SubmitChanges();
    }
}

NavigateAction

Ok, this one is a bit harder to implement but will helps you. To navigate in Silverlight, we need an instance of the NavigationService and call the Navigate method. Again, this can be simplified.

Create a TriggerAction to be attached to the Page since this is where we can have an instance of the NavigationService. We will add a property called PageUri which is the page to navigate to when the action is called:

public class NavigateAction : TriggerAction<Page>
{
   #region PageUri DependencyProperty
   public string PageUri
   {
      get { return (string)GetValue(PageUriProperty); }
      set { SetValue(PageUriProperty, value); }
   }

   public static readonly DependencyProperty PageUriProperty = DependencyProperty.Register
   (
      "PageUri",
      typeof(string),
      typeof(NavigateAction),
      new PropertyMetadata(null)
   );
}

protected override void Invoke(object parameter)
{
   var pageUri = PageUri;

   if (string.IsNullOrEmtpy(pageUri))
      return;

   var page = AssociatedObject;
   var uri = new Uri(pageUri, UriKind.RelativeOrAbsolute);
   var service = page.NavigationService;

   if (service != null)
      service.Navigate(uri);
}

NavigateAction with QueryString parameters

Good, we have a working NavigateAction. I think we can do much better. How about support for passing parameters in the query string. Here is the new implementation:

[ContentProperty("Parameters")]
public sealed class NavigateAction : TriggerAction<Page>
{
   public NavigateAction()
   {
      Parameters = new ObservableCollection<NavigateActionParameter>();
   }

   #region Methods
   private Uri CreateUri()
   {
      var s = PageUri;

      if (string.IsNullOrEmpty(s))
         return null;

      var c = s.Contains("?") ? '&' : '?';

      for (var i = 0; i < Parameters.Count; i++)
      {
         var parameter = Parameters[i];
         if (parameter == null)
            continue;

         var value = parameter.Value;
         if (value == null)
            continue;

         s += c + parameter.ParameterName + '=' + Uri.EscapeDataString(value);
         c = '&';
      }

      return new Uri(s, UriKind.RelativeOrAbsolute);
   }

   protected override void Invoke(object parameter)
   {
      var page = AssociatedObject;
      var uri = CreateUri();
      var service = page.NavigationService;

      if (uri != null && service != null)
         service.Navigate(uri);
   }
   #endregion

   #region Properties
   #region PageUri DependencyProperty
   public string PageUri
   {
      get { return (string)GetValue(PageUriProperty); }
      set { SetValue(PageUriProperty, value); }
   }

   public static readonly DependencyProperty PageUriProperty = DependencyProperty.Register
   (
      "PageUri",
      typeof(string),
      typeof(NavigateAction),
      new PropertyMetadata(null)
   );
   #endregion
   public ObservableCollection<NavigateActionParameter> Parameters { get; private set; }
   #endregion
}

public sealed class NavigateActionParameter : DependencyObject
{
   #region Properties
   public string ParameterName { get; set; }

   #region Value DependencyProperty
   public string Value
   {
      get { return (string)GetValue(ValueProperty); }
      set { SetValue(ValueProperty, value); }
   }

   public static readonly DependencyProperty ValueProperty = DependencyProperty.Register
   (
      "Value",
      typeof(string),
      typeof(NavigateActionParameter),
      new PropertyMetadata(null)
   );
   #endregion
   #endregion
}

You can now use the NavigateAction and add parameters in the Query String in Blend without using code. Here is a sample

<l:NavigateAction PageUri="/BacklogItemView?Action=Update">
   <l:NavigateActionParameter ParameterName="Id" Value="{Binding Path=SelectedItem.Id, ElementName=dataGrid1}"/>
</l:NavigateAction>

This will pass a parameter named Id in the Query String.

Happy programming!