Best Practices in the Asynchronous Agents Library

This document describes how to make effective use of the Asynchronous Agents Library. The Agents Library promotes an actor-based programming model and in-process message passing for coarse-grained dataflow and pipelining tasks.

For more information about the Agents Library, see Asynchronous Agents Library.

Sections

This document contains the following sections:

  • Use Agents to Isolate State

  • Use a Throttling Mechanism to Limit the Number of Messages in a Data Pipeline

  • Do Not Perform Fine-Grained Work in a Data Pipeline

  • Do Not Pass Large Message Payloads by Value

  • Use shared_ptr in a Data Network When Ownership Is Undefined

Use Agents to Isolate State

The Agents Library provides alternatives to shared state by letting you connect isolated components through an asynchronous message-passing mechanism. Asynchronous agents are most effective when they isolate their internal state from other components. By isolating state, multiple components do not typically act on shared data. State isolation can enable your application to scale because it reduces contention on shared memory. State isolation also reduces the chance of deadlock and race conditions because components do not have to synchronize access to shared data.

You typically isolate state in an agent by holding data members in the private or protected sections of the agent class and by using message buffers to communicate state changes. The following example shows the basic_agent class, which derives from Concurrency::agent. The basic_agent class uses two message buffers to communicate with external components. One message buffer holds incoming messages; the other message buffer holds outgoing messages.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

For complete examples about how to define and use agents, see Walkthrough: Creating an Agent-Based Application and Walkthrough: Creating a Dataflow Agent.

[go to top]

Use a Throttling Mechanism to Limit the Number of Messages in a Data Pipeline

Many message-buffer types, such as Concurrency::unbounded_buffer, can hold an unlimited number of messages. When a message producer sends messages to a data pipeline faster than the consumer can process these messages, the application can enter a low-memory or out-of-memory state. You can use a throttling mechanism, for example, a semaphore, to limit the number of messages that are concurrently active in a data pipeline.

The following basic example demonstrates how to use a semaphore to limit the number of messages in a data pipeline. The data pipeline uses the Concurrency::wait function to simulate an operation that takes at least 100 milliseconds. Because the sender produces messages faster than the consumer can process those messages, this example defines the semaphore class to enable the application to limit the number of active messages.

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (InterlockedDecrement(&_semaphore_count) < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (InterlockedIncrement(&_semaphore_count) <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count)
       : _current(static_cast<long>(count)) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0L)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(InterlockedDecrement(&_current) == 0L) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(InterlockedIncrement(&_current) == 1L) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}

/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

This example produces the following sample output:

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

The semaphore object limits the pipeline to process at most two messages at the same time.

The producer in this example sends relatively few messages to the consumer. Therefore, this example does not demonstrate a potential low-memory or out-of-memory condition. However, this mechanism is useful when a data pipeline contains a relatively high number of messages.

For more information about how to create the semaphore class that is used in this example, see How to: Use the Context Class to Implement a Cooperative Semaphore.

[go to top]

Do Not Perform Fine-Grained Work in a Data Pipeline

The Agents Library is most useful when the work that is performed by a data pipeline is fairly coarse-grained. For example, one application component might read data from a file or a network connection and occasionally send that data to another component. The protocol that the Agents Library uses to propagate messages causes the message-passing mechanism to have more overhead than the task parallel constructs that are provided by the Parallel Patterns Library (PPL). Therefore, make sure that the work that is performed by a data pipeline is long enough to offset this overhead.

Although a data pipeline is most effective when its tasks are coarse-grained, each stage of the data pipeline can use PPL constructs such as task groups and parallel algorithms to perform more fine-grained work. For an example of a coarse-grained data network that uses fine-grained parallelism at each processing stage, see Walkthrough: Creating an Image-Processing Network.

[go to top]

Do Not Pass Large Message Payloads by Value

In some cases, the runtime creates a copy of every message that it passes from one message buffer to another message buffer. For example, the Concurrency::overwrite_buffer class offers a copy of every message that it receives to each of its targets. The runtime also creates a copy of the message data when you use message-passing functions such as Concurrency::send and Concurrency::receive to write messages to and read messages from a message buffer. Although this mechanism helps eliminate the risk of concurrently writing to shared data, it could lead to poor memory performance when the message payload is relatively large.

You can use pointers or references to improve memory performance when you pass messages that have a large payload. The following example compares passing large messages by value to passing pointers to the same message type. The example defines two agent types, producer and consumer, that act on message_data objects. The example compares the time that is required for the producer to send several message_data objects to the consumer to the time that is required for the producer agent to send several pointers to message_data objects to the consumer.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

This example produces the following sample output:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

The version that uses pointers performs better because it eliminates the requirement for the runtime to create a full copy of every message_data object that it passes from the producer to the consumer.

[go to top]

Use shared_ptr in a Data Network When Ownership Is Undefined

When you send messages by pointer through a message-passing pipeline or network, you typically allocate the memory for each message at the front of the network and free that memory at the end of the network. Although this mechanism frequently works well, there are cases in which it is difficult or not possible to use it. For example, consider the case in which the data network contains multiple end nodes. In this case, there is no clear location to free the memory for the messages.

To solve this problem, you can use a mechanism, for example, std::shared_ptr, that enables a pointer to be owned by multiple components. When the final shared_ptr object that owns a resource is destroyed, the resource is also freed.

The following example demonstrates how to use shared_ptr to share pointer values among multiple message buffers. The example connects a Concurrency::overwrite_buffer object to three Concurrency::call objects. The overwrite_buffer class offers messages to each of its targets. Because there are multiple owners of the data at the end of the data network, this example uses shared_ptr to enable each call object to share ownership of the messages.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

This example produces the following sample output:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

See Also

Tasks

Walkthrough: Creating an Agent-Based Application

Walkthrough: Creating a Dataflow Agent

Walkthrough: Creating an Image-Processing Network

Concepts

Asynchronous Agents Library

Best Practices in the Parallel Patterns Library

General Best Practices in the Concurrency Runtime

Other Resources

Concurrency Runtime Best Practices