Como implementar vários padrões de produtor-consumidor
Este tópico descreve como implementar o padrão produtor-consumidor em seu aplicativo. Nesse padrão, o produtor envia mensagens a um bloco de mensagens e o consumidor lê mensagens nesse bloco.
O tópico demonstra dois cenários. No primeiro cenário, o consumidor deve receber cada mensagem enviada pelo produtor. No segundo cenário, o consumidor pesquisa periodicamente dados e, portanto, não precisa receber cada mensagem.
Ambos os exemplos neste tópico usam agentes, blocos de mensagens e funções de passagem de mensagens para transmitir mensagens do produtor para o consumidor. O agente de produtor usa a função concurrency::send para gravar mensagens em um objeto concurrency::ITarget. O agente do consumidor usa a função concurrency::receive para ler mensagens de um objeto concurrency::ISource. Ambos os agentes contêm um valor sentinela para coordenar o fim do processamento.
Para mais informações sobre agentes assíncronos, confira Agentes Assíncronos. Para mais informações sobre blocos de mensagens e funções de passagem de mensagens, confira Blocos de Mensagens Assíncronos e Funções de Passagem de Mensagem.
Exemplo: enviar série de números para o agente de consumidor
Neste exemplo, o agente de produtor envia uma série de números para o agente do consumidor. O consumidor recebe cada um desses números e calcula sua média. O aplicativo grava a média no console.
Este exemplo usa um objeto concurrency::unbounded_buffer para permitir que o produtor enfileire as mensagens. A classe unbounded_buffer
implementa ITarget
e ISource
para que o produtor e o consumidor possam enviar e receber mensagens de e para um buffer compartilhado. As funções send
e receive
coordenam a tarefa de propagar os dados do produtor para o consumidor.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
Este exemplo gerencia a seguinte saída.
The average is 50.
Exemplo: enviar série de cotações de ações para o agente do consumidor
Neste exemplo, o agente produtor envia uma série de cotações de ações para o agente consumidor. O agente de consumidor lê periodicamente a cotação atual e imprime-a no console.
Este exemplo é semelhante ao anterior, exceto que ele usa um objeto concurrency::overwrite_buffer para permitir que o produtor compartilhe uma mensagem com o consumidor. Como no exemplo anterior, a classe overwrite_buffer
implementa ITarget
e ISource
para que o produtor e o consumidor possam agir em um buffer de mensagens compartilhadas.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
Este exemplo gera a saída de amostra a seguir.
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
Ao contrário de um objeto unbounded_buffer
, a função receive
não remove a mensagem do objeto overwrite_buffer
. Se o consumidor ler do buffer de mensagem mais de uma vez antes que o produtor substitua essa mensagem, o receptor obterá a mesma mensagem todas as vezes.
Compilando o código
Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole-o em um arquivo chamado producer-consumer.cpp
e execute o comando a seguir em uma janela do Prompt de comando do Visual Studio.
cl.exe /EHsc producer-consumer.cpp
Confira também
Biblioteca de agentes assíncronos
Agentes assíncronos
Blocos de mensagens assíncronos
Funções de transmissão de mensagem