Procedura dettagliata: creazione di un agente del flusso di dati

In questo documento viene illustrato come creare le applicazioni basate sugli agenti in base al flusso di dati, anziché al flusso di controllo.

Il flusso di controllo si riferisce all'ordine di esecuzione delle operazioni in un programma.Il flusso di controllo viene regolato mediante strutture di controllo come istruzioni condizionali, cicli e così via.In alternativa, il flusso di dati si riferisce a un modello di programmazione in cui i calcoli vengono eseguiti solo quando tutti i dati richiesti sono disponibili.Il modello di programmazione del flusso di dati è correlato al concetto di passaggio dei messaggi, in cui i componenti indipendenti di un programma comunicano con un altro programma inviando messaggi.

Gli agenti asincroni supportano entrambi i modelli di programmazione del flusso di dati e del flusso di controllo.Sebbene il modello del flusso di controllo sia appropriato in molti casi, il modello del flusso di dati risulta appropriato in altri, ad esempio quando un agente riceve i dati ed esegue un'azione basata sul payload di tali dati.

Prerequisiti

Prima di iniziare questa procedura dettagliata, leggere i documenti riportati di seguito.

Sezioni

In questa procedura dettagliata sono contenute le sezioni seguenti:

  • Creazione di un agente di base del flusso di controllo

  • Creazione di un agente di base del flusso di dati

  • Creazione di un agente di registrazione dei messaggi

Creazione di un agente di base del flusso di controllo

Si consideri l'esempio seguente che definisce la classe control_flow_agent.La classe control_flow_agent agisce su tre buffer dei messaggi: un buffer di input e due buffer di output.Il metodo run legge i dati dal buffer dei messaggi di origine in un ciclo e utilizza un'istruzione condizionale per indirizzare il flusso di esecuzione del programma.L'agente incrementa un contatore per valori negativi diversi da zero e incrementa un altro contatore per valori positivi diversi da zero.Dopo aver ricevuto il valore sentinel zero, l'agente invia i valori dei contatori nel buffer dei messaggi di output.I metodi positives e negatives consentono all'applicazione di leggere i conteggi dei valori negativi e positivi dall'agente.

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

Sebbene in questo esempio venga illustrato l'utilizzo di base del flusso di controllo in un agente, viene indicata la natura seriale della programmazione basata sul flusso di controllo.Ogni messaggio deve essere elaborato in sequenza, anche se nel buffer dei messaggi di input potrebbero essere disponibili più messaggi.Il modello del flusso di dati consente la valutazione simultanea di entrambi i rami dell'istruzione condizionale.Il modello del flusso di dati consente inoltre di creare reti di messaggistica più complesse che agiscono sui dati man mano che diventano disponibili.

Top

Creazione di un agente di base del flusso di dati

In questa sezione viene illustrato come convertire la classe control_flow_agent in modo da utilizzare il modello del flusso di dati per eseguire la stessa attività.

L'agente del flusso di dati prevede la creazione di una rete di buffer dei messaggi, ciascuno dei quali viene utilizzato per uno scopo specifico.Alcuni blocchi di messaggi utilizzano una funzione di filtro per accettare o rifiutare un messaggio in base al relativo payload.Le funzioni di filtro garantiscono che un blocco di messaggi riceva solo determinati valori.

Per convertire l'agente del flusso di controllo in un agente del flusso di dati

  1. Copiare il corpo della classe control_flow_agent in un'altra classe, ad esempio dataflow_agent.In alternativa, è possibile rinominare la classe control_flow_agent.

  2. Rimuovere il corpo del ciclo che chiama receive dal metodo run.

    void run()
    {
       // Counts the number of negative and positive values that
       // the agent receives.
       size_t negative_count = 0;
       size_t positive_count = 0;
    
    
       // Write the counts to the message buffers.
       send(_negatives, negative_count);
       send(_positives, positive_count);
    
       // Set the agent to the completed state.
       done();
    }
    
  3. Dopo l'inizializzazione delle variabili negative_count e positive_count aggiungere nel metodo run un oggetto countdown_event che tiene traccia del conteggio delle operazioni attive.

    // Tracks the count of active operations.
    countdown_event active;
    // An event that is set by the sentinel.
    event received_sentinel;
    

    La classe countdown_event viene illustrata più avanti in questo argomento.

  4. Creare gli oggetti del buffer dei messaggi che prenderanno parte alla rete del flusso di dati.

    //
    // Create the members of the dataflow network.
    //
    
    // Increments the active counter.
    transformer<int, int> increment_active(
       [&active](int value) -> int {
          active.add_count();
          return value;
       });
    
    // Increments the count of negative values.
    call<int> negatives(
       [&](int value) {
          ++negative_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value < 0;
       });
    
    // Increments the count of positive values.
    call<int> positives(
       [&](int value) {
          ++positive_count;
          // Decrement the active counter.
          active.signal();
       },
       [](int value) -> bool {
          return value > 0;
       });
    
    // Receives only the sentinel value of 0.
    call<int> sentinel(
       [&](int value) {            
          // Decrement the active counter.
          active.signal();
          // Set the sentinel event.
          received_sentinel.set();
       },
       [](int value) { 
          return value == 0; 
       });
    
    // Connects the _source message buffer to the rest of the network.
    unbounded_buffer<int> connector;
    
  5. Connettere i buffer dei messaggi per formare una rete.

    //
    // Connect the network.
    //
    
    // Connect the internal nodes of the network.
    connector.link_target(&negatives);
    connector.link_target(&positives);
    connector.link_target(&sentinel);
    increment_active.link_target(&connector);
    
    // Connect the _source buffer to the internal network to 
    // begin data flow.
    _source.link_target(&increment_active);
    
  6. Attendere l'impostazione degli oggetti countdown event e event.Questi eventi segnalano che l'agente ha ricevuto il valore sentinel e che tutte le operazioni sono state completate.

    // Wait for the sentinel event and for all operations to finish.
    received_sentinel.wait();
    active.wait();
    

Nel diagramma seguente viene illustrata la rete del flusso di dati completa per la classe dataflow_agent:

Rete del flusso di dati

Nella tabella seguente sono descritti i membri della rete.

Membro

Descrizione

increment_active

A concurrency::transformer oggetto che incrementa il contatore eventi attivi e passa il valore di input per il resto della rete.

negatives, positives

Concurrency::Call oggetti incrementare il numero di numeri e decrementa il contatore eventi attivo.Gli oggetti utilizzano ciascuno un filtro per accettare i numeri negativi o i numeri positivi.

sentinel

A concurrency::call oggetto che accetta solo il valore sentinel uguale a zero e decrementa il contatore eventi attivi.

connector

A concurrency::unbounded_buffer oggetto che si connette il buffer del messaggio di origine per la rete interna.

Poiché il metodo run viene chiamato in un thread separato, gli altri thread possono inviare messaggi alla rete prima che la rete sia completamente connessa.Il membro dati _source è un oggetto unbounded_buffer che memorizza nel buffer l'input inviato dall'applicazione all'agente.Per assicurarsi che la rete elabori tutti i messaggi di input, l'agente collega innanzitutto i nodi interni della rete e quindi collega l'inizio di tale rete, connector, al membro dati _source.In questo modo si garantisce che i messaggi non vengano elaborati durante la preparazione della rete.

Poiché la rete in questo esempio è basata sul flusso di dati anziché sul flusso di controllo, la rete deve comunicare all'agente di aver completato l'elaborazione di ogni valore di input e che il nodo sentinel ha ricevuto il relativo valore.In questo esempio viene utilizzato un countdown_event oggetto per segnalare che sono stati elaborati tutti i valori di input e un concurrency::event oggetto per indicare che il nodo sentinel ha ricevuto il relativo valore.La classe countdown_event utilizza un oggetto event per segnalare quando un valore del contatore raggiunge lo zero.L'intestazione della rete del flusso di dati incrementa il contatore ogni volta che riceve un valore.Ogni nodo terminale della rete decrementa il contatore dopo l'elaborazione del valore di input.Dopo aver preparato la rete del flusso di dati, l'agente attende che il nodo sentinel imposti l'oggetto event e che l'oggetto countdown_event segnali che il contatore ha raggiunto lo zero.

Nell'esempio seguente vengono illustrate le classi control_flow_agent, dataflow_agent e countdown_event.La funzione wmain crea un oggetto control_flow_agent e dataflow_agent e utilizza la funzione send_values per inviare una serie di valori casuali agli agenti.

// dataflow-agent.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <iostream>
#include <random>

using namespace concurrency;
using namespace std;

// A basic agent that uses control-flow to regulate the order of program 
// execution. This agent reads numbers from a message buffer and counts the 
// number of positive and negative values.
class control_flow_agent : public agent
{
public:
   explicit control_flow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Read from the source buffer until we receive
      // the sentinel value of 0.
      int value = 0;      
      while ((value = receive(_source)) != 0)
      {
         // Send negative values to the first target and
         // non-negative values to the second target.
         if (value < 0)
            ++negative_count;
         else
            ++positive_count;
      }

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }
private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L)
      : _current(static_cast<long>(count)) 
   {
      // Set the event if the initial count is zero.
      if (_current == 0L)
         _event.set();
   }

   // Decrements the event counter.
   void signal() {
      if(InterlockedDecrement(&_current) == 0L) {
         _event.set();
      }
   }

   // Increments the event counter.
   void add_count() {
      if(InterlockedIncrement(&_current) == 1L) {
         _event.reset();
      }
   }

   // Blocks the current context until the event is set.
   void wait() {
      _event.wait();
   }

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

// A basic agent that resembles control_flow_agent, but uses uses dataflow to 
// perform computations when data becomes available.
class dataflow_agent : public agent
{
public:
   dataflow_agent(ISource<int>& source)
      : _source(source)
   {
   }

   // Retrieves the count of negative numbers that the agent received.
   size_t negatives() 
   {
      return receive(_negatives);
   }

   // Retrieves the count of positive numbers that the agent received.
   size_t positives()
   {
      return receive(_positives);
   }

protected:
   void run()
   {
      // Counts the number of negative and positive values that
      // the agent receives.
      size_t negative_count = 0;
      size_t positive_count = 0;

      // Tracks the count of active operations.
      countdown_event active;
      // An event that is set by the sentinel.
      event received_sentinel;

      //
      // Create the members of the dataflow network.
      //

      // Increments the active counter.
      transformer<int, int> increment_active(
         [&active](int value) -> int {
            active.add_count();
            return value;
         });

      // Increments the count of negative values.
      call<int> negatives(
         [&](int value) {
            ++negative_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value < 0;
         });

      // Increments the count of positive values.
      call<int> positives(
         [&](int value) {
            ++positive_count;
            // Decrement the active counter.
            active.signal();
         },
         [](int value) -> bool {
            return value > 0;
         });

      // Receives only the sentinel value of 0.
      call<int> sentinel(
         [&](int value) {            
            // Decrement the active counter.
            active.signal();
            // Set the sentinel event.
            received_sentinel.set();
         },
         [](int value) { 
            return value == 0; 
         });

      // Connects the _source message buffer to the rest of the network.
      unbounded_buffer<int> connector;

      //
      // Connect the network.
      //

      // Connect the internal nodes of the network.
      connector.link_target(&negatives);
      connector.link_target(&positives);
      connector.link_target(&sentinel);
      increment_active.link_target(&connector);

      // Connect the _source buffer to the internal network to 
      // begin data flow.
      _source.link_target(&increment_active);

      // Wait for the sentinel event and for all operations to finish.
      received_sentinel.wait();
      active.wait();

      // Write the counts to the message buffers.
      send(_negatives, negative_count);
      send(_positives, positive_count);

      // Set the agent to the completed state.
      done();
   }

private:
   // Source message buffer to read from.
   ISource<int>& _source;

   // Holds the number of negative and positive numbers that the agent receives.
   single_assignment<size_t> _negatives;
   single_assignment<size_t> _positives;
};

// Sends a number of random values to the provided message buffer.
void send_values(ITarget<int>& source, int sentinel, size_t count)
{
   // Send a series of random numbers to the source buffer.
   mt19937 rnd(42);
   for (size_t i = 0; i < count; ++i)
   {
      // Generate a random number that is not equal to the sentinel value.
      int n;
      while ((n = rnd()) == sentinel);

      send(source, n);      
   }
   // Send the sentinel value.
   send(source, sentinel);   
}

int wmain()
{
   // Signals to the agent that there are no more values to process.
   const int sentinel = 0;
   // The number of samples to send to each agent.
   const size_t count = 1000000;

   // The source buffer that the application writes numbers to and 
   // the agents read numbers from.
   unbounded_buffer<int> source;

   //
   // Use a control-flow agent to process a series of random numbers.
   //
   wcout << L"Control-flow agent:" << endl;

   // Create and start the agent.
   control_flow_agent cf_agent(source);
   cf_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&cf_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << cf_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << cf_agent.positives() 
         << L" positive numbers."<< endl;  

   //
   // Perform the same task, but this time with a dataflow agent.
   //
   wcout << L"Dataflow agent:" << endl;

   // Create and start the agent.
   dataflow_agent df_agent(source);
   df_agent.start();

   // Send values to the agent.
   send_values(source, sentinel, count);

   // Wait for the agent to finish.
   agent::wait(&df_agent);

   // Print the count of negative and positive numbers.
   wcout << L"There are " << df_agent.negatives() 
         << L" negative numbers."<< endl;
   wcout << L"There are " << df_agent.positives() 
         << L" positive numbers."<< endl;
}

Questo esempio produce l'output seguente:

Control-flow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.
Dataflow agent:
There are 500523 negative numbers.
There are 499477 positive numbers.

Ff398051.collapse_all(it-it,VS.110).gifCompilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio o incollarlo in un file denominato agent.cpp di flusso dei dati e quindi eseguire il comando riportato di seguito in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc dataflow-agent.cpp

Top

Creazione di un agente di registrazione dei messaggi

Nell'esempio seguente viene illustrata la classe log_agent, analoga alla classe dataflow_agent.La classe log_agent implementa un agente di registrazione asincrona che scrive i messaggi di log in un file e sulla console.La classe log_agent consente all'applicazione di suddividere i messaggi in categorie: informativi, di avviso o di errore.Consente inoltre all'applicazione di specificare se ogni categoria del log viene scritta in un file, nella console o in entrambi.In questo esempio vengono scritti tutti i messaggi di log in un file e solo i messaggi di errore nella console.

// log-filter.cpp
// compile with: /EHsc 
#include <windows.h>
#include <agents.h>
#include <sstream>
#include <fstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
    countdown_event(unsigned int count = 0L)
        : _current(static_cast<long>(count)) 
    {
        // Set the event if the initial count is zero.
        if (_current == 0L)
        {
            _event.set();
        }
    }

    // Decrements the event counter.
    void signal()
    {
        if(InterlockedDecrement(&_current) == 0L)
        {
            _event.set();
        }
    }

    // Increments the event counter.
    void add_count()
    {
        if(InterlockedIncrement(&_current) == 1L)
        {
            _event.reset();
        }
    }

    // Blocks the current context until the event is set.
    void wait()
    {
        _event.wait();
    }

private:
    // The current count.
    volatile long _current;
    // The event that is set when the counter reaches zero.
    event _event;

    // Disable copy constructor.
    countdown_event(const countdown_event&);
    // Disable assignment.
    countdown_event const & operator=(countdown_event const&);
};

// Defines message types for the logger.
enum log_message_type
{
    log_info    = 0x1,
    log_warning = 0x2,
    log_error   = 0x4,
};

// An asynchronous logging agent that writes log messages to 
// file and to the console.
class log_agent : public agent
{
    // Holds a message string and its logging type.
    struct log_message
    {
        wstring message;
        log_message_type type;
    };

public:
    log_agent(const wstring& file_path, log_message_type file_messages, log_message_type console_messages)
        : _file(file_path)
        , _file_messages(file_messages)
        , _console_messages(console_messages)
        , _active(0)
    {
        if (_file.bad())
        {
            throw invalid_argument("Unable to open log file.");
        }
    }

    // Writes the provided message to the log.
    void log(const wstring& message, log_message_type type)
    {  
        // Increment the active message count.
        _active.add_count();

        // Send the message to the network.
        log_message msg = { message, type };
        send(_log_buffer, msg);
    }

    void close()
    {
        // Signal that the agent is now closed.
        _closed.set();
    }

protected:

    void run()
    {
        //
        // Create the dataflow network.
        //

        // Writes a log message to file.
        call<log_message> writer([this](log_message msg)
        {
            if ((msg.type & _file_messages) != 0)
            {
                // Write the message to the file.
                write_to_stream(msg, _file);
            }
            if ((msg.type & _console_messages) != 0)
            {
                // Write the message to the console.
                write_to_stream(msg, wcout);
            }
            // Decrement the active counter.
            _active.signal();
        });

        // Connect _log_buffer to the internal network to begin data flow.
        _log_buffer.link_target(&writer);

        // Wait for the closed event to be signaled.
        _closed.wait();

        // Wait for all messages to be processed.
        _active.wait();

        // Close the log file and flush the console.
        _file.close();
        wcout.flush();

        // Set the agent to the completed state.
        done();
    }

private:
    // Writes a logging message to the specified output stream.
    void write_to_stream(const log_message& msg, wostream& stream)
    {
        // Write the message to the stream.
        wstringstream ss;

        switch (msg.type)
        {
        case log_info:
            ss << L"info: ";
            break;
        case log_warning:
            ss << L"warning: ";
            break;
        case log_error:
            ss << L"error: ";
        }

        ss << msg.message << endl;
        stream << ss.str();
    }

private:   
    // The file stream to write messages to.
    wofstream _file;

    // The log message types that are written to file.
    log_message_type _file_messages;

    // The log message types that are written to the console.
    log_message_type _console_messages;

    // The head of the network. Propagates logging messages
    // to the rest of the network.
    unbounded_buffer<log_message> _log_buffer;

    // Counts the number of active messages in the network.
    countdown_event _active;

    // Signals that the agent has been closed.
    event _closed;
};

int wmain()
{
    // Union of all log message types.
    log_message_type log_all = log_message_type(log_info | log_warning  | log_error);

    // Create a logging agent that writes all log messages to file and error 
    // messages to the console.
    log_agent logger(L"log.txt", log_all, log_error);

    // Start the agent.
    logger.start();

    // Log a few messages.

    logger.log(L"===Logging started.===", log_info);

    logger.log(L"This is a sample warning message.", log_warning);
    logger.log(L"This is a sample error message.", log_error);

    logger.log(L"===Logging finished.===", log_info);

    // Close the logger and wait for the agent to finish.
    logger.close();
    agent::wait(&logger);
}

L'esempio scrive sulla console l'output seguente.

error: This is a sample error message.

Questo esempio crea inoltre il file log.txt, che contiene il testo seguente.

info: ===Logging started.===
warning: This is a sample warning message.
error: This is a sample error message.
info: ===Logging finished.===

Ff398051.collapse_all(it-it,VS.110).gifCompilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio o incollarlo in un file denominato Registro filter.cpp e quindi eseguire il comando riportato di seguito in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc log-filter.cpp

Top

Vedere anche

Altre risorse

Procedure dettagliate del runtime di concorrenza