Como: Implementar vários padrões de produtor de consumidor

Este tópico descreve como implementar o padrão de consumidor-produtor em seu aplicativo. Nesse padrão, o produtor envia mensagens para um bloco de mensagens e o consumidor lê mensagens a partir desse bloco.

O tópico demonstra dois cenários. No primeiro cenário, o consumidor deve receber cada mensagem que o producer envia. No segundo cenário, o consumidor procura periodicamente os dados e, portanto, não precisa receber cada mensagem.

Ambos os exemplos neste tópico usam agentes, blocos de mensagens e funções de transmissão de mensagens para transmitir mensagens do produtor para o consumidor. O agente do producer utiliza a Concurrency::send função para escrever mensagens para um Concurrency::ITarget objeto. O agente do consumidor usa a Concurrency::receive função para ler mensagens de um Concurrency::ISource objeto. Os dois agentes mantêm um valor de Sentinela para coordenar o final do processamento.

Para obter mais informações sobre agentes assíncronos, consulte Agentes assíncronos. Para obter mais informações sobre funções de transmissão de mensagens e de blocos de mensagens, consulte Blocos de mensagens assíncronas e Funções de transmissão de mensagens.

Exemplo

Neste exemplo, o agente do producer envia uma série de números para o agente do consumidor. O consumidor recebe cada um desses números e computa sua média. O aplicativo grava a média no console.

Este exemplo usa um Concurrency::unbounded_buffer objeto para ativar o produtor de mensagens da fila. O unbounded_buffer classe implementa ITarget e ISource para que o produtor e consumidor podem enviar e receber mensagens de e para um buffer compartilhado. O send e receive funções coordenam a tarefa de propagar os dados do produtor para o consumidor.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

O exemplo produz a seguinte saída.

The average is 50.

Neste exemplo, o agente do producer envia uma série de cotações de ações para o agente do consumidor. Periodicamente, o agente do consumidor lê a cotação atual e imprime no console.

Este exemplo semelhante ao anterior, exceto que ele usa um Concurrency::overwrite_buffer objeto para ativar o produtor compartilhar uma mensagem com o consumidor. Como no exemplo anterior, overwrite_buffer classe implementa ITarget e ISource para que o produtor e consumidor podem atuar em um buffer de mensagem compartilhada.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (quotes.begin(), quotes.end(), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         Concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         Concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Este exemplo produz a saída de exemplo a seguir.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Diferentemente com um unbounded_buffer objeto, o receive função não remove a mensagem a partir do overwrite_buffer objeto. Se o consumidor lê de buffer de mensagem, mais uma vez antes do produtor substitui essa mensagem, o receptor obtém a mesma mensagem de cada vez.

Compilando o código

Copie o código de exemplo e colá-lo em um Visual Studio do projeto, ou colá-lo em um arquivo que é chamado consumer.cpp de produtor e, em seguida, execute o seguinte comando um Visual Studio 2010 janela do Prompt de comando.

cl.exe /EHsc producer-consumer.cpp

Consulte também

Conceitos

Biblioteca de agentes assíncronos

Agentes assíncronos

Blocos de mensagens assíncronas

Funções de transmissão de mensagens