Gewusst wie: Implementieren verschiedener Producer-Consumer-Muster

In diesem Thema wird die Implementierung des Producer-Consumer-Musters in der Anwendung beschrieben. Bei diesem Muster sendet der Producer Nachrichten an einen Nachrichtenblock, während der Consumer Nachrichten aus diesem Block ausliest.

In diesem Thema werden zwei Szenarien beschrieben. Im ersten Szenario muss der Consumer alle Nachrichten empfangen, die der Producer sendet. Im zweiten Szenario ruft der Consumer in regelmäßigen Abständen Daten ab und muss daher nicht jede Nachricht empfangen.

In beiden Beispielen in diesem Thema werden Nachrichten mithilfe von Agents, Nachrichtenblöcken und Nachrichtenübergabefunktionen vom Producer an den Consumer übertragen. Der Producer-Agent schreibt Nachrichten mit der Concurrency::send-Funktion in ein Concurrency::ITarget-Objekt. Der Consumer-Agent liest Nachrichten mit der Concurrency::receive-Funktion aus einem Concurrency::ISource-Objekt. Beide Agents enthalten einen Sentinelwert, um das Ende der Verarbeitung zu koordinieren.

Weitere Informationen zu asynchronen Agents finden Sie unter Asynchrone Agents. Weitere Informationen zu Nachrichtenblöcken und Nachrichtenübergabefunktionen finden Sie unter Asynchrone Nachrichtenblöcke und unter Funktionen zum Übergeben von Meldungen.

Beispiel

In diesem Beispiel sendet der Producer-Agent eine Reihe von Zahlen an den Consumer-Agent. Diese werden vom Consumer empfangen und ihr Durchschnitt wird berechnet. Anschließend wird der Durchschnitt in die Konsole geschrieben.

Bei diesem Beispiel kann der Producer mithilfe eines Concurrency::unbounded_buffer-Objekts Nachrichten in eine Warteschlange stellen. Die unbounded_buffer-Klasse implementiert ITarget und ISource, damit Producer und Consumer Nachrichten über einen gemeinsamen Puffer senden und empfangen können. Die Funktionen send und receive koordinieren die Aufgabe, die Daten vom Producer an den Consumer weiterzugeben.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Folgende Ergebnisse werden zurückgegeben:

The average is 50.

In diesem Beispiel sendet der Producer-Agent eine Reihe von Aktienkursen an den Consumer-Agent. Der Consumer-Agent liest den aktuellen Kurs in regelmäßigen Abständen und gibt ihn an der Konsole aus.

Dieses Beispiel gleicht dem vorherigen, wobei hier jedoch der Producer mithilfe eines Concurrency::overwrite_buffer-Objekts eine Nachricht mit dem Consumer gemeinsam verwenden kann. Wie im vorherigen Beispiel implementiert die overwrite_buffer-Klasse ITarget und ISource, sodass Producer und Consumer einen gemeinsamen Nachrichtenpuffer nutzen können.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (quotes.begin(), quotes.end(), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         Concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         Concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Dieses Beispiel erzeugt die folgende Beispielausgabe.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Anders als bei einem unbounded_buffer-Objekt entfernt die receive-Funktion die Nachricht nicht aus dem overwrite_buffer-Objekt. Wenn der Consumer den Nachrichtenpuffer mehr als einmal ausliest, bevor der Producer diese Nachricht überschreibt, erhält der Empfänger jedes Mal dieselbe Nachricht.

Kompilieren des Codes

Kopieren Sie den Beispielcode, und fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie ihn in eine Datei mit dem Namen producer_consumer.cpp ein, und führen Sie dann den folgenden Befehl in einem Visual Studio 2010-Eingabeaufforderungsfenster aus.

cl.exe /EHsc producer-consumer.cpp

Siehe auch

Konzepte

Asynchronous Agents Library

Asynchrone Agents

Asynchrone Nachrichtenblöcke

Funktionen zum Übergeben von Meldungen