Procedura: implementare vari modelli producer-consumer

In questo argomento viene descritto come implementare il modello producer-consumer nell'applicazione. In questo modello il producer invia i messaggi a un blocco dei messaggi e il consumer legge i messaggi da tale blocco.

Nell'argomento vengono illustrati due scenari. Nel primo scenario il consumer deve ricevere ogni messaggio inviato dal producer. Nel secondo scenario il consumer esegue periodicamente il polling dei dati e pertanto non deve ricevere ogni messaggio.

In entrambi gli esempi di questo argomento vengono utilizzati gli agenti, i blocchi dei messaggi e le funzioni di passaggio dei messaggi per trasmettere i messaggi dal producer al consumer. L'agente del producer utilizza la funzione Concurrency::send per scrivere i messaggi in un oggetto Concurrency::ITarget. L'agente del consumer utilizza la funzione Concurrency::receive per leggere i messaggi da un oggetto Concurrency::ISource. Entrambi gli agenti contengono un valore sentinel per coordinare la fine dell'elaborazione.

Per ulteriori informazioni sugli agenti asincroni, vedere Agenti asincroni. Per ulteriori informazioni sui blocchi dei messaggi e sulle funzioni di passaggio dei messaggi, vedere Blocchi dei messaggi asincroni e Funzioni di passaggio dei messaggi.

Esempio

In questo esempio l'agente del producer invia una serie di numeri all'agente del consumer. Il consumer riceve ognuno di questi numeri e ne calcola la media. L'applicazione scrive la media nella console.

In questo esempio viene utilizzato un oggetto Concurrency::unbounded_buffer per consentire al producer di accodare i messaggi. La classe unbounded_buffer implementa ITarget e ISource in modo tale che il producer e il consumer possano inviare e ricevere i messaggi da un buffer condiviso. Le funzioni send e receive coordinano l'attività di propagazione dei dati dal producer al consumer.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Questo esempio produce l'output che segue.

The average is 50.

In questo esempio l'agente del producer invia una serie di quotazioni di Bora all'agente del consumer. L'agente del consumer legge periodicamente la quotazione corrente e la visualizza sulla console.

Questo esempio è simile al precedente, ad eccezione del fatto che viene utilizzato l'oggetto Concurrency::overwrite_buffer per consentire al producer di condividere un messaggio con il consumer. Come nell'esempio precedente, la classe overwrite_buffer implementa ITarget e ISource in modo tale che il producer e il consumer possano agire su un buffer dei messaggi condiviso.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (quotes.begin(), quotes.end(), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         Concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         Concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Questo esempio produce l'output seguente:

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Diversamente da un oggetto unbounded_buffer, la funzione receive non rimuove il messaggio dall'oggetto overwrite_buffer. Se il consumer legge più volte dal buffer dei messaggi prima che il producer sovrascriva il messaggio, il destinatario otterrà ogni volta lo stesso messaggio.

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio o incollarlo in un file denominato producer-consumer.cpp, quindi eseguire il comando seguente in una finestra del prompt dei comandi di Visual Studio 2010.

cl.exe /EHsc producer-consumer.cpp

Vedere anche

Concetti

Libreria di agenti asincroni

Agenti asincroni

Blocchi dei messaggi asincroni

Funzioni di passaggio dei messaggi