방법: 다양한 공급자/소비자 패턴 구현

이 항목에서는 응용 프로그램에서 공급자/소비자 패턴을 구현하는 방법에 대해 설명합니다.이 패턴에서 공급자는 메시지 블록에 메시지를 보내고 소비자는 해당 블록에서 메시지를 읽습니다.

이 항목에서는 두 가지 시나리오를 보여 줍니다.첫 번째 시나리오에서 소비자는 공급자가 보내는 각 메시지를 받아야 합니다.두 번째 시나리오에서 소비자는 주기적으로 데이터를 폴링하므로 각 메시지를 받을 필요가 없습니다.

이 항목에 나오는 두 예제에서는 모두 에이전트, 메시지 블록 및 메시지 전달 함수를 사용하여 공급자가 소비자에게 메시지를 전송합니다.생산자 에이전트가 사용 하는 concurrency::send 메시지를 작성 하는 함수는 concurrency::ITarget 개체.소비자 에이전트가 사용 하는 concurrency::receive 함수에서 메시지를 읽을 수는 concurrency::ISource 개체입니다.두 에이전트 모두 처리의 끝을 조정하는 데 사용할 센티널 값을 포함합니다.

비동기 에이전트에 대한 자세한 내용은 비동기 에이전트를 참조하십시오.메시지 블록 및 메시지 전달 함수에 대한 자세한 내용은 비동기 메시지 블록메시지 전달 함수를 참조하십시오.

예제

이 예제에서 공급자 에이전트는 일련의 숫자를 소비자 에이전트에게 보냅니다.소비자는 이러한 각 숫자를 받아 평균 값을 계산합니다.그러면 응용 프로그램에서 평균 값을 콘솔에 씁니다.

예제는 concurrency::unbounded_buffer 생산자에 메시지 큐를 사용 하는 개체입니다.unbounded_buffer 클래스는 ITargetISource를 구현하여 공급자와 소비자가 공유 버퍼에 메시지를 보내거나 공유 버퍼에서 메시지를 받을 수 있도록 합니다.sendreceive 함수는 공급자에서 소비자로 데이터를 전파하는 작업을 조정합니다.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

이 예제의 결과는 다음과 같습니다.

The average is 50.

이 예제에서 공급자 에이전트는 일련의 주식 시세를 소비자 에이전트에게 보냅니다.소비자 에이전트는 현재 시세를 주기적으로 읽고 콘솔에 출력합니다.

사용 하는 점을 제외 하 고 이전이 예제와 비슷합니다.는 concurrency::overwrite_buffer 생산자 소비자와 하나의 메시지를 공유할 수 있도록 하는 개체입니다.이전 예제와 같이 overwrite_buffer 클래스는 ITargetISource를 구현하여 공급자와 소비자가 공유 메시지 버퍼에서 동작할 수 있도록 합니다.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

unbounded_buffer 개체와 달리 receive 함수는 overwrite_buffer 개체에서 메시지를 제거하지 않습니다.공급자가 메시지를 덮어쓰기 전에 소비자가 해당 메시지 버퍼를 두 번 이상 읽으면 수신자는 매번 같은 메시지를 가져옵니다.

코드 컴파일

예제 코드를 복사 하 고 Visual Studio 프로젝트에 붙여 또는 라는 파일에 붙여 생산자 consumer.cpp 및 다음 Visual Studio 명령 프롬프트 창에서 다음 명령을 실행 합니다.

cl.exe /EHsc producer-consumer.cpp

참고 항목

개념

비동기 에이전트 라이브러리

비동기 에이전트

비동기 메시지 블록

메시지 전달 함수