Archives

Wednesday, February 9, 2011

Reactive Extensions

The Windows Phone 7 platform includes the Reactive Extensions for .NET Framework (Rx for short) to enable you to implement a wide range of asynchronous behaviours on the phone. Rx is “reactive” because it enables you to write applications that  react to external events such as data arriving over the network, a stream of data arriving from a hardware device or a user clicking a button in the UI. While Rx is bundled with the Windows Phone 7 API, you can also download it for use with standard .NET applications (and for use with Silverlight and JavaScript).
Rx requires a slightly different way of thinking about asynchronous behaviour and events. The fundamental concept of Rx is based around the idea of there being something that is observable that generates a sequence of things for an observer to observe. The other things to be aware of about the way that Rx works are:
  • It’s a push model — the observable instance pushes information to the observer instance.
  • The observer can filter the information that it receives by using LINQ queries.
Before going any further, here’s a couple of very simple examples to illustrate some of these points. Lets start with a simple enumerable sequence in C#:
int[] numbers = { 1, 2, 3, 5, 8, 13, 21 };
Arrays in .NET implement the IEnumerable interface that enables you to iterate over the array, pulling each item from the array:
foreach (var number in numbers)
{
Console.WriteLine("Number: {0}", number);
}
For this example we can convert the array to an observable sequence that implements the IObservable interface:
IObservable<int> observableNumbers = numbers.ToObservable();
We can then observe this sequence by subscribing to the IObservable instance like this:
observableNumbers.Subscribe(
number => Console.WriteLine("Number: {0}", number),
ex => Console.WriteLine("Something went wrong: {0}", ex.Message),
() => Console.WriteLine("At the end of the sequence")
);
This overload of the Subscribe method receives each number pushed at it by the observable sequence, and the three lambda expressions allow you to:
  • Handle each item in the sequence as it arrives.
  • Handle any exceptions raised.
  • Handle the end of the sequence.
Rx supports a wide range of use cases, and there are some excellent resources available for you to start exploring how you an use Rx. For example:
However, to start understanding the Rx approach I found it useful to work out how to do things in Rx that I already new how to do with other approaches. The following examples show some of these scenarios:

Handling Events

Writing code to handle events is a very common task. The following code shows how you might prevent a user from entering non-numeric characters into a text box:
<TextBox Height="23" Name="textBox1" Width="120" KeyDown="textBox1_KeyDown" />
private void textBox1_KeyDown(object sender, KeyEventArgs e)
{
if (e.Key < Key.NumPad0 || e.Key > Key.NumPad9)
{
e.Handled = true;
}
}
To do the same thing using Rx, you could do this:
<TextBox Height="23" Name="textBox2" Width="120" />
Notice that you don’t need the KeyDown attribute in the XAML code for this approach to work.
var keys = from evt in Observable.FromEvent<KeyEventArgs>(textBox2, "KeyDown")
where (evt.EventArgs.Key < Key.NumPad0 || evt.EventArgs.Key > Key.NumPad9)
select evt.EventArgs;
The FromEvent method enables you to treat an event as an observable sequence. This example also shows how you can use a LINQ expression to filter/query the items in the observable sequence; here you are only selecting non-numeric characters.
// Ideally you should dispose of keySubscription when the Window is disposed.
var keysSubscription = keys.Subscribe(evt =>
{
evt.Handled = true;
label1.Content = "Invalid character: " + evt.Key.ToString();
});
The Subscribe method here receives any non-numeric keystrokes in the textBox2 control, discards them and then updates a label control.

Running a Background Task

The following example shows the outline to a standard approach to running a background task in a WPF application using a the Task class. See John Sharp’s post here for more background in the Task class and the Task Parallel Library.
private void button1_Click(object sender, RoutedEventArgs e)
{
Task task = new Task(doWork);
task.Start();
}

delegate void ContentSetterDelegate(Label label, string text);
private void doWork()
{
// Work really hard at something.
Thread.Sleep(2000);
this.Dispatcher.Invoke(new ContentSetterDelegate(setContentProperty), label1, "Finished the work at last!");
}

private void setContentProperty(Label label, string text)
{
label.Content = text;
}
One thing to note about this is the use of the Dispatcher.Invoke method to update the UI from a background thread.
To achieve the same results using Rx, you could use the following code:
var clicks = from evt in Observable.FromEvent<RoutedEventArgs>(button2, "Click")
select evt;
var clicksSubscription = clicks.Subscribe(evt =>
{
var backgroundTask = Observable.Start(() =>
{
// Work really hard at something.
Thread.Sleep(3000);
}).ObserveOnDispatcher();

backgroundTask.Subscribe(
_ => { },
() => label1.Content = "It's all done now!"
);
});
This example responds to a click event using the same technique as the previous Rx example, and then runs the background task by using the Observable.Start method. The ObserveOnDispatcher method ensures that the observer runs on the dispatcher thread so that it can safely update the UI.

Synchronizing Tasks

John Sharp’s post here also described how you could synchronize several background tasks using the WaitAll and WaitAny methods in the Task Parallel Library. You can do a similar thing using Rx like this:
var task1 = Observable.Start(() =>
{
// Work really hard and return a result
Thread.Sleep(4000);
return 10;
});

var task2 = Observable.Start(() =>
{
// Work really hard and return a result
Thread.Sleep(3000);
return 30;
});

var tasks = Observable.ForkJoin(
task1, task2
).Finally(() => Console.WriteLine("Done!"));

// Wait for all the tasks to finish
var results = tasks.First();

// Process all the results
int sum = 0;
foreach (int r in results)
sum += r;
Console.WriteLine("The sum of all the tasks was: {0}", sum);
This example uses the Observable.Start method to define two background tasks, each of which returns an integer result. The ForkJoin method enables you to wait for both tasks to complete, and the example code then prints a “Done” message using the Finally method. You can then access the results of all the background tasks by using the First method (First because you want the first, and only set of results from the tasks sequence).
There’s an interesting discussion about the differences between running multiple background tasks with Rx and using the Task Parallel Library here: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/12d3f79a-0a53-4854-976e-5fa0d86f01ee/.

No comments:

Post a Comment