The TPL Dataflow Library (TDF) builds on the TPL with features that enable you to build services that can produce and consume data asynchronously. The TDF is based around a set of datatypes that can act as sources and sinks of data, and linkages that connect sources to sinks.
The simplest sink is provided by the generic ActionBlock type. This class implements a method named Post that receives data. You can specify the processing to be performed for each item received by associating a delegate with the ActionBlock object. The following example shows how to create an ActionBlock object that passes all integer data it receives to the method named ProcessData.
ActionBlock<int> intProcessor = new ActionBlock<int>(i => ProcessData(i));
...
// Send data to the intProcessor ActionBlock
intProcessor.Post(99);
intProcessor.Post(1);
intProcessor.Post(101);
The ActionBlock class is an example of an executor block other executor block types are available, such as the TransformBlock and the TransformManyBlock classes. The TDF also includes a number of classes that simply implement data buffering; you can retrieve and process each item individually. A prime example is the generic BufferBlock class illustrated in the following example.
BufferBlock<int> buffer = new BufferBlock<int>();
// Add data to the BufferBlock
buffer.Post(99);
buffer.Post(100);
// Retrieve data from the BufferBlock
int data = buffer.Receive();
Task t = buffer.ReceiveAsync();
ActionBlock<int> intProcessor = new ActionBlock<int>(i => ProcessData(i));
BufferBlock<int> buffer = new BufferBlock<int>();
// Connect the ActionBlock to the BufferBlock
// - items received by the BufferBlock are passed to the ActionBlock for processing
buffer.LinkTo(intProcessor);
As a complete worked example, the following WPF application captures data generated by an event source and plots this data on a graph (the graphCanvas element that occupies the major part of the window). The user clicks the Plot Graph button to initiate the event source and start collecting data. The data is also logged to a file, just to show how to use a BroadcastBlock object to direct data to multiple destinations.
<Window x:Class="GraphDemo.GraphWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="Graph Demo" Height="343" Width="675" ResizeMode="CanResize" SizeToContent="WidthAndHeight">
<Grid>
<Button Height="23" HorizontalAlignment="Left" Margin="12,38,0,0" Name="plotButton" VerticalAlignment="Top" Width="75" Click="plotButton_Click">Plot Graph</Button>
<Canvas Height="304" Name="graphCanvas" Background="Black" Margin="105,0,0,0" VerticalAlignment="Top" HorizontalAlignment="Left" Width="548" />
</Grid>
</Window>
using System;
using System.Threading;
using System.Windows;
namespace GraphDemo
{
public delegate void DataPointGeneratedDelegate(object sender, PointEventArgs pt);
public class PointEventArgs : EventArgs
{
public Point Data;
}
class EventDataSource
{
#region public members
public event DataPointGeneratedDelegate DataPointGenerated = null;
public void GenerateData(int sizeOfXDimension, int sizeOfYDimension)
{
int a = sizeOfXDimension / 2;
int b = a * a;
int c = sizeOfYDimension / 2;
for (int x = 0; x < a; x++)
{
int s = x * x;
double p = Math.Sqrt(b - s);
for (double i = -p; i < p; i += 3)
{
double r = Math.Sqrt(s + i * i) / a;
double q = (r - 1) * Math.Sin(24 * r);
double y = i / 3 + (q * c);
Thread.Sleep(10);
Point pt = new Point((int)(-x + a), (int)(y + c));
raiseDataPointGeneratedEvent(new PointEventArgs { Data = pt });
pt = new Point((int)(x + a), (int)(y + c));
raiseDataPointGeneratedEvent(new PointEventArgs { Data = pt });
}
}
}
#endregion
#region private members
private void raiseDataPointGeneratedEvent(PointEventArgs pt)
{
if (DataPointGenerated != null)
DataPointGenerated(this, pt);
}
#endregion
}
}
Finally, the following code shows the application logic behind the GraphWindow window.
using System;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Media;
using System.Windows.Shapes;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace GraphDemo
{
public partial class GraphWindow : Window
{
private int pixelWidth = 600;
private int pixelHeight = 300;
private EventDataSource dataSource = null;
public GraphWindow()
{
InitializeComponent();
dataSource = new EventDataSource();
}
private void plotButton_Click(object sender, RoutedEventArgs e)
{
TaskScheduler uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
BroadcastBlock<Point> pointBroadcaster = new BroadcastBlock<Point>(pt => pt);
ActionBlock<Point> graphPlotter = new ActionBlock<Point>(pt => plotPoint(pt, graphCanvas), new ExecutionDataflowBlockOptions { TaskScheduler = uiScheduler });
ActionBlock<Point> dataLogger = new ActionBlock<Point>(pt => logPoint(pt));
pointBroadcaster.LinkTo(graphPlotter);
pointBroadcaster.LinkTo(dataLogger);
dataSource.DataPointGenerated += (s, pt) => pointBroadcaster.Post<Point>(pt.Data);
Task.Factory.StartNew(() => dataSource.GenerateData(pixelWidth, pixelHeight));
}
// Log the data – code not shown
private void logPoint(Point pt)
{
...
}
// Plot the data as a graph on the specified Canvas
private void plotPoint(Point pointToPlot, Canvas c)
{
Rectangle point = new Rectangle();
point.Width = 0.5;
point.Height = 0.5;
point.Fill = new SolidColorBrush(Colors.White);
double xCoord = c.Width / (this.pixelWidth / pointToPlot.X);
double yCoord = c.Height / (this.pixelHeight / pointToPlot.Y);
Canvas.SetLeft(point, xCoord);
Canvas.SetTop(point, yCoord);
c.Children.Add(point);
}
}
}
The graphPlotter ActionBlock executor takes each Point object passed to it and invokes the plotPoint method to plot the corresponding point on the graphCanvas element of the WPF window. Note that the Task created to actually run the plotPoint method must execute on the UI thread, so the ExecutiondataflowBlockOptions object provided to the ActionBlock constructor specifies that it should utilize the task scheduler for the user interface synchronization context. The dataLogger ActionBlock executor logs the same Point data to a file (the code is omitted from the example). In this case, the Task created to handle this data should not run on the UI thread, so the code does not specify any scheduler options allowing the TDF to employ its default scheduling semantics.
Both ActionBlock executors are attached to the pointBroadcaster buffer block by using the LinkTo method, before the code arranges to capture DataPointGenerated events form the event source and post them to the pointBroadcaster object. Finally, the code starts the event source running asynchronously.
When you start the application and click Plot Button, the application slowly displays the graph in the WPF window (the event source raises a pair of events for each data point every 10 milliseconds, but there are a large number of data points). However, the user interface is still responsive (you can move the window around) because the plotPoint and logPoint methods are executed asynchronously.
The TDF is a highly extensible collection of types for implementing asynchronous data processing based on the producer/consumer model. This article has shown some of the basic features, but there are several other executor and buffer classes available. Furthermore, you can define your own custom executor and buffer classes by implementing the ISourceBlock, ITargetBlock, and IPropagatorBlock interfaces of the TDF. Visit the Introduction to TPL Dataflow page on the Microsoft Web site to download the library and documentation.
No comments:
Post a Comment