Dataflow Programming with Maestro

Disclaimer: The Maestro project has been renamed. While referred to in this article as ‘Maestro’, moving forward the project is referred to with codename ‘Axum’.

The way you perform an operation in a typical imperative programming language goes like this: you call a method giving it the data it needs as arguments, wait for that method to finish, then pass the result to other methods, and so on. Sometimes a method you call relies not only on the parameters passed in, but also on some shared state, such a global variable, so you must be careful not call the method before the global variable is set. This is hard enough but it gets hairier still when multiple threads get involved.

The model where you arrange the control flow in your program explicitly to take into account implicit dependencies between data is sometimes contrasted with the dataflow programming model, in which the dependencies between data is explicit, and the flow of the operations on that data is determined by these dependencies. The graph of these dependencies is called the dataflow network.

The concept is familiar to anyone who ever worked with spreadsheets, such as Microsoft Excel or Lotus 1-2-3. When a cell in a spreadsheet depends on another cell, availability of new data in that cell drives re-calculation of the dependent cell. A group of cells that do not depend on each other can be re-calculated in parallel. This is the essence of the dataflow programming model.

Sources and Targets

In Maestro, the basic building blocks of the dataflow networks are called the source and the target. The source is where the data comes from, and the target is where the data goes to. In theory, there can be many ways to pass the data from a source to a target. In Maestro, this is done via message-passing.

This is how we can we can define a source and a target variables in Maestro:

ISource<int> source = ...;
ITarget<int> target = ...;

(Here I intentionally omitted the right-hand side of the initialization statement, because it would lead us into discussion about ports and channels – all the fascinating things that I want to cover in future articles!)

Given these two variables, we have enough to build our first dataflow network. Here it is:

source ==> target;

The immediate effect of the above statement is to take all the messages from the source and forward them to the target. But there is more to it than that. The forwarding operator ==> creates a lasting link between the source and the target, so that whenever a new message comes out of the source it will be automatically propagated to the target.

You can get a hold of the network produced by the forwarding expression, and close it when the forwarding is no longer required:

var myNetwork = source ==> target;
// do something
myNetwork.Dispose();

Or, with a using statement (which in Maestro looks just like in C#):

using(var myNetwork = source ==> target)
{
    // do something
}

This works, but doesn’t do anything interesting. Bear with me; things are going to get more exciting in a minute!

Sinks and Transforms

To do anything useful with a dataflow network, we want to be able to perform an operation on the data as it comes out of the source. Here is what you do: you create a target from a method that takes the message payload as a parameter:

// set up dataflow network
source ==> ProcessNumber;
...
void ProcessNumber(int n)
{
// print the number on the console
    Console.WriteLine("n={0}", n);
}

We can build something even more realistic than this. Imagine an event-driven GUI application that handles mouse click and key press:

ISource<Point> mouseClick;
ISource<Key> keyPress;
...
mouseClick ==> OnMouseClick;
keyPress ==> OnKeyPress;
...

void OnMouseClick (Point coordinates)
{
    // handle mouse click here
}
void OnKeyPress (Key key)
{
    // handle key press here
}

You know where I’m going with this: handling the two events asynchronously, making your application faster and more responsive, and so on. This is indeed the point, but I’m getting a little ahead of myself. Hold on to this thought…

OnMouseClick and OnKeyPress methods above take a single parameter and return nothing. Appropriately, they are called sinks – the data disappears in them, never to be seen again.

A method that takes a parameter and returns a value (not necessarily of the same type) is called a transform. Here is a transform that takes absolute screen coordinates and translates them to the current window coordinates:

Point ScreenToWindow(Point point)
{
return new Point(X = point.X – currentWindow.Left,
Y = point.Y – currentWindow.Top);
}

Transform is a target, because it accepts data. It is also a source, because it produces a result.

Using a transform in a network expression goes like this:

mouseClick ==> ScreenToWindow ==> OnMouseClick;

The dataflow network that we’ve just created is called a pipeline. In such a network, data entering the first element makes its way through the intermediate transforms, finally reaching the last element, which is usually a sink.

Here is another realistic example. You want to send the pictures from your recent family trip to Maui to your mother-in-law. You need to resize and filter each picture, remove red eye effect, save it in a different format and so on. The pipeline will look like this:

imageFromFile==>Resize==>Filter==>RemoveRedEye==>Save;

Once the pipeline is set up, you would send images to the leftmost element of the pipeline thusly:

for(var file in files)
{
    send(imageFromFile, file);
}

The goodness of the pipeline is that not only can we process multiple images at a time, when we do so, we can also invoke multiple stages of the pipeline in parallel. This wouldn’t be the case if we were to do it in the following manner:

for(var file in files)
{
    Save(RemoveRedEye(Filter(Resize(file))));
}

This gets us back to the question of the asynchronous invocation of the nodes of the dataflow network. It’s time we tackle this question head on.

Side Effects

Recall the GUI example with methods OnMouseClick and OnKeyPress. It’s tempting to let the two methods execute concurrently. However, if the methods access a shared object in an unsafe way, their concurrent execution would result in a race condition – a nasty, hard to debug defect.

To execute the two methods safely, one of the following conditions must hold:

  • The methods don’t read or write shared mutable state;
  • The methods can read a shared object, but no other thread in the system is allowed to modify the object at the same time.

The first would be best: a method that doesn’t rely on the shared mutable state is safe to execute at any time, regardless of anything else that goes on in the system. (I used the word mutable here because immutable state doesn’t change so access to it doesn’t require synchronization. Think of it as a constant in your code). Repeat invocations of such methods – they are called referentially transparent – would produce the same result, and can be cached. Referentially transparent methods are easy to compose. Programming models that allow only referentially transparent methods are very good for writing parallel programs. They are not very good for writing useful programs – not in the impure world we live in, anyways.

The other kinds of methods are called side-effect free. To be able to reason about side-effects we need a well-defined concept of isolation, i.e. the boundary of the side-effect. The way I stated it – “shared object” – leaves some ambiguity as to what exactly is the object, and its granularity. If a thread modifies an object’s field, can another thread safely read other fields of the object? What about objects referenced by this object? Objects referring to the original object?

The units of isolation in Maestro are the agent and the domain. The data in the domain is shared among the agents in a “disciplined” way that avoids data races. Multiple domains do not share data with one another. (To read more about it, check out this blog post by Niklas Gustafsson, our architect).

Since domains don’t share data (guaranteed by the language and enforced by the compiler), methods from different domains can execute in parallel with each other. So if OnMouseClick and OnKeyPress were defined in different domains, we would be done here.

Methods in the same domain can share domain data, and methods in the same agent can share both domain and agent data. If we want to execute such methods in parallel, we need to ensure that the two conditions above hold for them. Namely, either the methods don’t read any domain or agent state, or if they do, no other method can write to it while these methods are executing.

Modern compilers can do amazing things, and our friends at Microsoft Research have come up with a way to analyze a method for its read- and write-effects. You can find some of their results here. While this technology is promising, it’s not yet ready for prime time. Two things make such analysis complex. First, while it’s relatively easy to analyze the objects directly affected by the method, we also have to analyze all the invoked methods too, and that, transitively, means having to analyze the entire program. For the programmer, it means having to wait longer for the compiler to finish.

Second, restricting a method from modifying any object would be overly limiting. We still want to allow modification of objects created by the method itself, or by the methods it calls – provided that these objects don’t “leak” into different threads. Keeping track of the ownership of objects, and graphs of objects is incredibly complex. Again, this complexity translates into your time spent staring at the screen waiting for the build to complete.

We also thought about using the Software Transactional Memory technology, developed by our peers in the Parallel Computing Team. Using STM, we would allow the method to modify any object, while keeping the log of the changes. If two methods make a conflicting change, we would undo the changes made by one of the methods, and then re-execute it. STM seems like a good fit for us, but there are some challenges. For a method to be transactable, we must be able to undo everything caused by its execution. This includes sending and receiving messages, as well as everything that happens in agents that receive those messages. Suddenly, you have a transaction that spans multiple agents and threads, and that complicates things tremendously. An obvious tradeoff would be to disallow message-passing and other “questionable” operations inside a transaction – but that would make dataflow networks in Maestro less useful. In other words, we still have some thinking to do here.

We’ll keep looking into these and other alternatives. In the meantime, the solution that we settled on is more pragmatic: it makes the compiler job easier, but it requires a little bit of hand-holding by the programmer.

Instead of relying on the compiler to infer the effects of the method, you declare the method to be a side-effect free function:

function Point ScreenToWindow(Point point)
{
    ...
}

A function cannot modify the state of the agent or the domain it is defined in. A function can only call other functions of the same agent – this is how we avoid having to perform full program analysis.

There is actually more to it, and I hope to cover it in detail in some other post, but you’ve already got the gist of the idea: the programmer explicitly describes the read- and write-effects of the method, the compiler makes sure that the code conforms to the declaration, and the runtime schedules the execution of the methods in way that avoids data races but maximizes parallelism.

Scalars and Vectors

Passing messages between a single source and a single target lets us built forwarders and pipelines, but that’s about it. What if you want to take a message from a source and send a copy to several targets? You can do that, and an operator for that is called broadcast. It looks like this:

source -<< targets;

The right-hand side of the operator, variable targets can be defined as an array, list or IEnumerable of ITarget. We will use an umbrella-term term vector to describe them. In contrast, single sources and targets will be called scalars.

Because vectors are so common in Maestro network expressions, there is a convenient syntax for them, which is similar to inline array initializers in C#:

source -<< { target1, target2, target3 };

This statement sets up a network that propagates all messages coming out of source to target1, target2 and target3.

Another operator that operates on a scalar source and a vector target is called alternate. The alternate, written as “-<:” is useful in the scenarios where we want to load-balance work. As the data comes out of the source, alternate “round-robins” it between the targets. For example, you might want to use alternate to implement a pool of “workers” to handle incoming data. Here is how you do it:

ITarget<int>[] workers;
...
source -<: workers;

Finally, Maestro has two operators for propagating data from a vector source to a scalar target. One of them is the multiplex, written as “>>-”. The multiplex forwards data from each of the sources into the target, as soon as the data is available. Unlike the multiplex, combine operator “&>-” waits until all sources have produced data, then joins it into an array and propagates it to the target. Example:

{ num1, num2, num3 } &>- PrintManyNumbers;
...
void PrintManyNumbers(int[] nums)
{
foreach(var i in nums)
Console.WriteLine(i);
}

The beauty of the network expressions in Maestro lies in their composability. The operators make it easy to put several expressions together; you only need to make sure that the types of the operands “line up”.

To illustrate the point, consider a network that calculates the sum of two numbers. After everything I’ve told you about the dataflow networks, you probably expect something more sophisticated than a simple expression a+b. You won’t be disappointed – I’m going to go all out and put as many network operators as possible in a single statement. Enjoy:

ISourceAndTarget<int[]> join;
ISource<int> a;
ISource<int> b;
ITarget<int> sum;
...

{ a ==> TraceNumber, b ==> TraceNumber }
&>- join -<: { GetSum, GetSum } >>- sum;

...
private int TraceNumber(int n)
{
    Console.WriteLine("Got number {0}", n);
    return n;
}

private function int GetSum(int[] nums)
{
    int result=0;
    foreach(var num in nums) result += num;
    return result;
}

Here the data coming out of sources a and b is first forwarded to the TraceNumber method. This is a transform that prints the number on the console and then forwards it on. Next, the numbers are combined in the array join. From there, the data alternates between the two GetSum transforms. The alternation becomes useful when you send a stream of data through the network, allowing multiple GetSum functions to execute in parallel. Finally, the result is multiplexed in the resulting target sum.

This example is obviously far-fetched. In a real program we need to do something more substantial than adding a few numbers to amortize the overhead of setting up the network and spawning off the transforms. You also probably won’t have to construct such contrived networks when programming in Maestro, but it sure is nice to know what’s possible!

I hope you found this useful. Thanks, and as always, we’re looking for your comments.

Artur Laksberg,
Parallel Computing Team

Comments

  • Anonymous
    February 27, 2009
    Sorry but what is Maestro? Is it a code name for a new project?

  • Anonymous
    February 27, 2009
    Hi Kris - Welcome to Maestro team blog! Maestro is a new .Net programming language based on asynchronous agents, isolation and message-passing. Check out the first post of this blog: http://blogs.msdn.com/maestroteam/archive/2009/02/27/we-haven-t-forgotten-about-other-models-honest.aspx Artur

  • Anonymous
    February 27, 2009
    Why did you decide to create various pipelining operators instead of allowing composition of pipelines? For example, for load-balacning, use the xor operator ^: src ==> ( endpt1 ^ endpt 2 ) for parallel composition, use the and operator &: src ==> ( endpt1 & endpt2 ) A join would be similar, but with the pipeline composition on the left hand side: ( src1 & src2 ) ==> endpt And so forth...

  • Anonymous
    February 27, 2009
    How do you handle functions with multiple arguments? E.g.: function int[] Add(int value, int[] nums) {  return nums.Select( n => n + value ); } ... multiple inputs? (1,nums) ==> Add currying? nums ==> x => Add( 1, x ) Or: function int[] PairwiseAdd(int[] a, int[] b) {  return [0..a.Length].Select( i => a[i] + b[i] ); } ... ? (nums,nums) ==> PairwiseAdd What about multiple return values? function (int[],int[]) AddAndMultiply(int value, int[] nums) {  return (    nums.Select( n => n + value ),    nums.Select( n => n * value )  ); } Also, the examples above use linq syntax, which executes a single function over multiple values, whereas most of the network operators pass multiple values to a single function. Is the forwarding operator overloaded so that "nums ==> n => n + value" would work? Or, more explicitly: int value; ITarget<int> add = (n => n + value); ISource<int[]> nums; int[] added = (nums ==> add); ? By the way, interesting stuff :)

  • Anonymous
    February 27, 2009
    Since functions can't call functions defined in other agents, what is the means of using functions from third party frameworks (if indeed it's currently possible)? Is there any way for us to play with the bits? Do you have a syntax for forwarding event streams?  For example ISource<MouseEvent> mouseMoves = uiElement.MouseMove; // MouseMove = event mouseMoves ==> writeToConsole How about streams? ISource<byte> stream = new SerialPort("COM1")GetStream(); // or something like that.  I forget the syntax :)

  • Anonymous
    February 27, 2009
    Steve, I like your syntax: intuitive, easy to remember. Nice symmetry between the broadcast and the join. How would I use that syntax for vectors of arbitrary lenght? For example: ITarget<int>[] myTargets = ...; Now load-balance a source between myTargets. BTW, this comes up often when the number of sources/targets is not known statically.

  • Anonymous
    February 27, 2009
    Emperor XLII: As we currently have it, you can't use methods with multiple parameters in network expression -- because once you allow it, you have to allow mutlitple returns, as you pointed out. Definitely possible, but since this is the first version of the language, we wanted to keep it simple. We have a way to "package" multiple elements into a single container called "schema", that should help somewhat (I didn't talk about it in this post, but one of us will try to cover it later). We also thought about using function currying in a network expression, where you would supply extra arguments explicitly. The syntax could look like this: source ==> ProcessData( 10, 20, _ ) where "_" stands for the payload of the message. The payload is still one element, but now you can use methods with multiple parameters. Again, maybe in the next version. Thanks for your comment!

  • Anonymous
    February 27, 2009
    The comment has been removed

  • Anonymous
    February 27, 2009
    More to Emperor XLII: int[] added = (nums ==> add); This almost works. You'll need to combine the result into an array of ints using oiperator "&>-" (see my example with PrintManyNumbers). And yes, anonymous methods also work, exactly how you wrote it.

  • Anonymous
    February 27, 2009
    When will we get to play with this, finally? Consider making it available, at least like those unsupported stuffs.

  • Anonymous
    February 27, 2009
    The comment has been removed

  • Anonymous
    February 28, 2009
    Very interesting stuff. Thanks for creating the blog, subscribed. Looking forward to what comes out of this. We need some innovation in the parallel programming area.

  • Anonymous
    March 02, 2009
    @Niklas Gustafsson: The tuple idea makes a lot of sense to me, and I can see it work well with currying to customize how multiple arguments are bound, in addition to acting as a simple placeholder.  I.e. "(source) ==> A( _, 10 )" as well as "(tuple source) ==> B( _[1], 10, _[2] )". @artur.laksberg: For reference, here's a slightly updated version of my original example (using currying instead of an anonymous method): ISource<int[]> nums = ...; function int Add( int n, int value ) { return n + value; } // (incorrect) int[] added = nums ==> Add( _, 1 ); Is this the correct way, using the "&>-" operator? int[] added; nums ==> Add( _, 1 ) &>- added; Would this also work? int[] added -<& nums ==> Add( _, 1 ); Or maybe: int[] added =<& nums ==> Add( _, 1 ); And thanks for your replies!

  • Anonymous
    March 02, 2009
    The comment has been removed

  • Anonymous
    March 02, 2009
    Artur, Since (A & B) is a pipeline composition, it's return type is an ITarget<T>.  So, if you need to create a join/load-balance pipeline over a variable number of sources/targets, the following would be a solution: ITarget<int>[] myTargets = ...; ITarget<int> loadbalancer = Pipelines.LoadBalance(myTargets); where LoadBalance could be implemented as follows (terrible impl; just for illustration): ITarget<T> LoadBalance<T>(ITarget<T> first, params ITarget<T>[] next) {    var results = first;    for(int i = 0; i < next.Length; ++i) result &= next[i];    return result; }

  • Anonymous
    March 03, 2009
    Emperor XLII: To clarify a couple of points... The following would not work as written: nums ==> Add( _, 1 ) &>- added; This is because 'added' is not an ITarget -- it's an array of ints. But, it would easy to fix with an anonymous method: nums ==> Add( _, 1 ) &>- (x => added = x); The compiler recognizes methods as transforms or sinks turning them into sources and/or targets, but not arbitrary expressions. (Otherwise, what would happen when multiple messages are passed over the network?) For the same reason, a statement like this: int[] added = <network expression> would also generate an error, since the expression on the left hand side is an array, which isn't a target or a source. I hope that clarifies it, but feel free to ask more! Artur

  • Anonymous
    March 03, 2009
    Steve, Thanks for your comments. We want to be able to chain up multiple network operators and have them work naturally with variable size vectors. For example: start -<: workers &>- end Where workers is a vector of a transform type. Artur

  • Anonymous
    March 03, 2009
    How do your combinators compare to Arrow combinators from Haskell?  The idea seems similar.

  • Anonymous
    May 05, 2009
    We rarely speak of parallelism when discussing Axum, because you very rarely think about it when designing

  • Anonymous
    May 06, 2009
    The subject of immutability sparks intense interest among the people who follow our blog, as is evident

  • Anonymous
    May 07, 2009
    Hi. Nice stuff. I got three questions, though (is this the right place for them?): Is it possible to reconfigure the network at runtime?

  • add or remove links between functions
  • enable or disable links between functions What are the semantics of &>-? Consider a scenario with 3 ISource<int>'s x, y and z where x has an update rate of, say, one value every second and y and z updating every 200 ms? Which values are going to be in the result? Can one manually synchronize the data received by the sources or are built-in operators the only way to go? Do you have any performance metrics for running on a Windows CE device?
  • Anonymous
    July 17, 2009
    Hi, great job! Hopefully we will get our own dataflow language But i have few questions: What's the difference between CCR and Maestro, except obvious of course such as "library vs language" or "functional vs imperative". http://www.microsoft.com/ccrdss/ Why would someone prefer using Maestro to CCR/DSS? What extra powers Maestro gives to us over CCR? How are you handling exceptions, failures, roll backs? Especially in complex pipelines - for example multiple chained targets. Thanks.