Comment : implémenter divers modèles de producteur-consommateur

Cette rubrique explique comment implémenter le modèle producteur-consommateur dans votre application. Dans ce modèle, le producteur envoie des messages à un bloc de message et le consommateur lit les messages de ce bloc.

La rubrique présente deux scénarios. Dans le premier scénario, le consommateur doit recevoir chaque message envoyé par le producteur. Dans le deuxième scénario, le consommateur interroge régulièrement les données et n’a donc pas besoin de recevoir chaque message.

Les deux exemples de cette rubrique utilisent des agents, des blocs de messages et des fonctions de transmission de messages pour transmettre des messages du producteur au consommateur. L’agent producteur utilise la fonction concurrency ::send pour écrire des messages dans un objet concurrency ::ITarget . L’agent consommateur utilise la fonction concurrency ::receive pour lire les messages d’un objet concurrency ::ISource . Les deux agents contiennent une valeur sentinelle pour coordonner la fin du traitement.

Pour plus d’informations sur les agents asynchrones, consultez Agents asynchrones. Pour plus d’informations sur les blocs de messages et les fonctions de passage de messages, consultez Les blocs de messages asynchrones et les fonctions de transmission de messages.

Exemple : Envoyer une série de numéros à l’agent consommateur

Dans cet exemple, l’agent producteur envoie une série de nombres à l’agent consommateur. Le consommateur reçoit chacun de ces nombres et calcule sa moyenne. L’application écrit la moyenne dans la console.

Cet exemple utilise un objet concurrency ::unbounded_buffer pour permettre au producteur de mettre en file d’attente les messages. La unbounded_buffer classe implémente ITarget et ISource permet au producteur et au consommateur d’envoyer et de recevoir des messages vers et à partir d’une mémoire tampon partagée. Les send fonctions coordonnent receive la tâche de propagation des données du producteur au consommateur.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Cet exemple produit la sortie suivante.

The average is 50.

Exemple : Envoyer une série de guillemets boursiers à l’agent consommateur

Dans cet exemple, l’agent producteur envoie une série de guillemets boursiers à l’agent consommateur. L’agent consommateur lit régulièrement le devis actuel et l’imprime dans la console.

Cet exemple ressemble à celui précédent, sauf qu’il utilise un objet concurrency ::overwrite_buffer pour permettre au producteur de partager un message avec le consommateur. Comme dans l’exemple précédent, overwrite_buffer la classe implémente ITarget et ISource permet au producteur et au consommateur d’agir sur une mémoire tampon de message partagée.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Cet exemple génère l’exemple de sortie suivant.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Contrairement à un unbounded_buffer objet, la receive fonction ne supprime pas le message de l’objet overwrite_buffer . Si le consommateur lit à partir de la mémoire tampon du message plusieurs fois avant que le producteur remplace ce message, le destinataire obtient le même message à chaque fois.

Compilation du code

Copiez l’exemple de code et collez-le dans un projet Visual Studio, ou collez-le dans un fichier nommé producer-consumer.cpp , puis exécutez la commande suivante dans une fenêtre d’invite de commandes Visual Studio.

cl.exe /EHsc producer-consumer.cpp

Voir aussi

Bibliothèque d’agents asynchrones
Agents asynchrones
Blocs de messages asynchrones
Fonctions de passage de messages