The Microsoft Reactive Extensions (RX) are a library of methods and types that extend the LINQ standard query operators to encompass event-based data sources and asynchronous operations. RX extends the ability of LINQ to handle dynamic observable collections.
In the traditional LINQ model, you define an enumerable collection of objects, and then iterate over that collection and process each item in turn. The collection that you iterate over must provide a means of enumerating elements in that collection, and so it commonly implements the
IEnumerable interface (either directly or indirectly, possibly via the
IQueryable interface). The
IEnumerable interface defines the
GetEnumerator method which returns an
IEnumerator object. This object actually does the work of retrieving elements from the collection, providing a property called
Current which returns the current item from the collection, and a method named
MoveNext which is used to move on to the next item in the collection (returning true if there is such an item, or false if there are no more items). LINQ enables you to write code that looks like this, where
customers is an enumerable collection of
customer objects:
var customersAgedOver30 = from cust in customers
where cust.Age > 30
select cust;
foreach (var customer in customersAgedOver30)
{
ProcessCustomer(customer);
}
This much is “old hat”, and most .NET Framework developers are familiar with this model. However, the scheme specified by implementing the
IEnumerable interface is focused on explicitly pulling data, on-demand, from the data source. If there is no data left, the
MoveNext method of the enumerator returns false, and the application processing the data assumes that that is no more data to fetch so it stops trying to retrieve any more.
RX takes the view that not all collections match this behavior. Instead, RX enables you to define an observable collection. An observable collection is a dynamic set containing a potentially infinite number of elements; new elements may be added to the collection at any time. If you want to examine the data in this collection, then merely attempting to enumerate the elements that it contains is no longer an appropriate strategy (enumerating an infinite collection will take a very long time!). The
Observer pattern provides an ideal solution to this problem, based on pushing data out to parties that are interested in it. This pattern has been around for years, and is well-documented by Gamma et al in their Design Patterns book.
The Observer pattern defines two types of entity: subjects that expose data, and observers that need to know when this data changes. A subject provides a means to enable observers to register their interest, and then notifies these observers when data is updated. In the past, the .NET Framework has supported the Observer pattern through the
ObservableCollection class, but RX enables a more generalized implementation through the
Observer and
Notification types, together with a number of extension methods that make the functionality of these types accessible through LINQ. Using RX, you can easily convert an enumerable collection into an observable one simply by applying the
ToObservable extension method, and then you can create an observer by using the
Subscribe method of the observable collection; this method expects you to provide a delegate to a method that runs as each element in the collection is observed.
In its simplest form, you might simply consider RX as a simple recasting of code to iterate through enumerable collections. However, the real power of RX lies in being able to apply LINQ semantics to data that was previously non-enumerable. Consider a device capturing data such as the locations and magnitude of seismic waves caused by an earthquake and raising an event each time a new shock was detected. If you were writing a Windows program to capture and process this data, you would typically add a handler that listens to the corresponding events emitted by the device with code similar to this:
EventDataSource earthQuakeDataSource = new EventDataSource();
...
earthQuakeDataSource.EarthquakeDetected += (source, eventArgs) => ProcessData(eventArgs);
Now suppose that you wanted to filter the data so that it only detected earthquakes of magnitude 5 or more. You might amend the code in this way.
earthQuakeDataSource.EarthquakeDetected += source, eventArgs) =>
{
if (eventArgs.Magntitude >= 5)
ProcessData(eventArgs);
};
Additionally, consider what you might need to do if the eventArgs object contains a lot of information that is superfluous to the
ProcessData method, and that you only want to pass the data in the
Location property (specifying the coordinates of the epicenter of the earthquake) of this object to the
ProcessData method:
earthQuakeDataSource.EarthquakeDetected += (source, eventArgs) =>
{
if (eventArgs.Magntitude >= 5)
ProcessData(eventArgs.Location);
};
Each of these coding changes is arguably quite small, but each one starts to obscure the information that you are passing to the
ProcessData method. As you add more conditions (for example, suppose you wanted to refine the data further and only capture the details of earthquakes recorded in Alaska), the code could quickly become much more complicated. If you need to change the magnitude and location requirements at a later date, you could quite easily miss the code that implements them.
If you think about it, what you are actually doing is applying a predicate to filter the data, and then performing a projection operation. This is exactly the sort of thing that LINQ is good at with its
where and
select operators. However, events are not an enumerable data source, and applying LINQ to them was quite tricky. RX fixes this.
With RX you can observe events using the static
FromEventPattern method of the
Observable class. This is a generic method that takes the event source and the name of the event as arguments. You can then subscribe to this observable collection and arrange for a piece of code to be run to be run each time a new event is detected, as follows.
var earthquakeEvents = Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected");
var subscription = earthquakeEvents.Subscribe(args => ProcessData(args.EventArgs.Location));
Note: Although it exhibits some collection behavior, the
earthquakeEvents variable is not really a true collection; the data defined for each event is not retained in any form of queryable structure, and once the corresponding notification to call the
ProcessData method has been fired the data that defines the event is discarded.
The
Observable class provides extension methods that enable you to apply LINQ operators, so to filter and project the earthquake event data as specified earlier you can simply add the appropriate
where and
select clauses:
var earthquakeEvents = from evt in Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected")
where evt.Magnitude >= 5
select evt.EventArgs.Location;
var subscription = earthquakeEvents.Subscribe(args => ProcessData(args));
At first glance it might seem that the
FromEventPattern really does little more than provide an alternative syntax for handling events, but as soon as you adopt this approach you can quickly start to gain from many of the other benefits that RX provides. To extend the earthquake device scenario, suppose that you wanted to stop monitoring for earthquake events if the user pressed the Escape key on the keyboard. In the traditional approach, you might add another event handler to listen for keyboard events, filter these events to determine whether the user had pressed the Escape key, and then unsubscribe the
earthquakeDataSource from the
EarthquakeDetected event. You can achieve the same results with RX by observing the
KeyDown event, and then applying the
TakeUntil method to combine the two event observations together, as follows:
var earthquakeEvents = from evt in Observable.FromEventPattern<QuakeEventArgs>(earthQuakeDataSource, "QuakeDetected")
where evt.Magnitude >= 5
select evt.EventArgs.Location;
var escapeKeyPressed = from key in Observable.FromEventPattern<KeyEventArgs>(this, "KeyDown")
where key.EventArgs.Key == Key.Escape
select key;
var dataUntilEscPressed = earthquakeEvents.TakeUntil(escapeKeyPressed);
var subscription = dataUntilEscPressed.Subscribe(args => ProcessData(args));
The
TakeUntil method causes the subscription to the first observable event to be canceled if an instance of the event defined by its parameter occurs.
Using RX to subscribe events enables you to separate the code that handles the events from the definitions of those events, leading to more easily readable and maintainable code. Additionally, the ability that RX provides for composing and combining events together makes for a very elegant solution to many common event-handling problems.