Demonstra Passo a passo: Criar um bloco de mensagem personalizada

Este documento descreve como criar um tipo de bloco de mensagem personalizada que ordena as mensagens recebidas por prioridade.

Embora os tipos de bloco de mensagem interna fornecem um intervalo de toda a funcionalidade, você pode criar seu próprio tipo de bloco de mensagem e personalizá-lo para atender aos requisitos do seu aplicativo. Para obter uma descrição dos tipos de bloco de mensagem interna são fornecidos pela biblioteca de agentes assíncronos, consulte Blocos de mensagens assíncronas.

Pré-requisitos

Antes de começar este passo a passo, leia os seguintes documentos:

Seções

Esta explicação passo a passo contém as seções a seguir:

  • A criação de um bloco de mensagens personalizada

  • Definindo a classe de priority_buffer

  • O exemplo completo

A criação de um bloco de mensagens personalizada

Participarem de blocos de mensagem o ato de enviar e receber mensagens. Um bloco de mensagens envia mensagens é conhecido como um Bloco de origem. Um bloco de mensagens que recebe mensagens é conhecido como um Bloco de destino. Um bloco de mensagens que envia e recebe mensagens é conhecido como um Bloco de propagador. A biblioteca de agentes usa a classe abstrata Concurrency::ISource para representar blocos de código-fonte e a classe abstrata Concurrency::ITarget para representar blocos de destino. Esse ato de tipos de bloco de mensagens como fontes derivam de ISource; Esse ato de tipos de bloco de mensagens como destinos derivam de ITarget.

Embora você pode derivar do tipo de bloco de mensagem diretamente do ISource e ITarget, a biblioteca de agentes define três classes de base que realizam grande parte da funcionalidade que é comum a todos os bloqueios tipos de mensagens, por exemplo, tratamento de erros e conectando os blocos de mensagens em uma maneira segura de simultaneidade. O Concurrency::source_block classe deriva de ISource e envia mensagens para outros blocos. O Concurrency::target_block classe deriva de ITarget e recebe mensagens de outros blocos. O Concurrency::propagator_block classe deriva de ISource e ITarget e enviar mensagens a outros blocos e ele recebe mensagens de outros blocos. Recomendamos que você use esses três classes de base para lidar com os detalhes da infra-estrutura para que você possa se concentrar no comportamento do seu bloco de mensagem.

O source_block, target_block, e propagator_block classes são modelos que são parametrizados em um tipo que gerencia as conexões ou links entre os blocos de origem e de destino e em um tipo que gerencia como as mensagens são processadas. A biblioteca de agentes define dois tipos de realizem o gerenciamento do link, Concurrency::single_link_registry e Concurrency::multi_link_registry. O single_link_registry classe permite que um bloco de mensagem a ser vinculado a uma fonte ou para um destino. O multi_link_registry classe permite que um bloco de mensagens para serem vinculadas a várias fontes ou vários destinos. A biblioteca de agentes define uma classe que executa o gerenciamento de mensagens, Concurrency::ordered_message_processor. O ordered_message_processor classe permite que os blocos de mensagens processar mensagens na ordem em que ele recebe.

Para entender melhor como se relacionam os blocos de mensagem com suas origens e destinos, considere o exemplo a seguir. Este exemplo mostra a declaração da Concurrency::transformer classe.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

O transformer classe deriva de propagator_blocke, portanto, atua como um bloco de origem e um bloco de destino. Ele aceita mensagens do tipo _Input e envia mensagens do tipo _Output. O transformer Especifica a classe single_link_registry como o Gerenciador de link para os blocos de destino e multi_link_registry como o Gerenciador de link para blocos de origem. Portanto, um transformer objeto pode ter até um destino e um número ilimitado de fontes.

Uma classe que deriva de source_block deve implementar seis métodos: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message, e resume_propagation. Uma classe que deriva de target_block deve implementar a propagate_message método e opcionalmente pode implementar a send_message método. Derivando de propagator_block é funcionalmente equivalente à derivação de ambos source_block e target_block.

O propagate_to_any_targets método é chamado pelo tempo de execução assíncrona ou síncrona processar mensagens de entrada e propagadas quaisquer mensagens de saída. O accept_message método é chamado pelos blocos de destino para aceitar mensagens. Muitos tipos de bloco de mensagens, como unbounded_buffer, enviar mensagens somente para o primeiro destino que receberia o proprietário. Portanto, ele transfere a propriedade da mensagem para o destino. Bloco de mensagens de outro tipos, como Concurrency::overwrite_buffer, oferecem mensagens a cada um de seus blocos de destino. Portanto, overwrite_buffer cria uma cópia da mensagem para cada um dos destinos.

O reserve_message, consume_message, release_message, e resume_propagation métodos permitem blocos de mensagens participar de reserva de mensagem. Destino bloqueará a chamada a reserve_message método quando eles são oferecidos a uma mensagem e terá que reservar a mensagem para uso posterior. Depois de um bloco de destino reserva uma mensagem, ele pode chamar o consume_message método para consumir a mensagem ou o release_message método para cancelar a reserva. Como ocorre com o accept_message método, a implementação de consume_message pode qualquer propriedade de transferência da mensagem ou retornar a uma cópia da mensagem. Depois de um bloco de destino consome ou libera uma mensagem reservada, o tempo de execução chama o resume_propagation método. Normalmente, esse método continua a propagação de mensagem, começando com a próxima mensagem na fila.

As chamadas de tempo de execução de propagate_message método para transferir assincronamente uma mensagem a partir de outro bloco à atual. O send_message método semelhante ao propagate_message, exceto que ela de forma síncrona, em vez de forma assíncrona, envia a mensagem para os blocos de destino. A implementação padrão de send_message rejeita todas as mensagens de entrada. O runtime não chamar qualquer um desses métodos se a mensagem não passar a função de filtro opcional que está associada com o bloco de destino. Para obter mais informações sobre filtros de mensagens, consulte Blocos de mensagens assíncronas.

go to top

Definindo a classe de priority_buffer

O priority_buffer classe é um tipo de bloco de mensagem personalizada que ordena as mensagens recebidas pela primeira vez, por prioridade e, em seguida, pela ordem na qual as mensagens são recebidas. O priority_buffer classe semelhante a Concurrency::unbounded_buffer classe porque mantém uma fila de mensagens e também porque ele atua como uma origem e um bloco de mensagens de destino e pode ter várias fontes e vários destinos. No entanto, unbounded_buffer baseia a propagação de mensagem somente na ordem em que ele recebe mensagens de fontes.

O priority_buffer classe recebe as mensagens do tipo std::tuple que contêm PriorityType e Type elementos. PriorityTyperefere-se ao tipo que mantém a prioridade de cada mensagem; Typerefere-se a parte de dados da mensagem. O priority_buffer classe envia mensagens do tipo Type. O priority_buffer classe também gerencia duas filas de mensagens: um std::priority_queue o objeto para mensagens de entrada e um std::queue o objeto para mensagens de saída. Ordenação de mensagens por prioridade é útil quando um priority_buffer objeto recebe várias mensagens simultaneamente ou quando ele recebe várias mensagens antes de todas as mensagens são lidas pelos consumidores.

Além de para sete métodos que uma classe que deriva de propagator_block deve implementar, o priority_buffer classe também substitui o link_target_notification e send_message métodos. O priority_buffer classe também define dois métodos públicos de auxiliar, enqueue e dequeuee um método auxiliar particular, propagate_priority_order.

O procedimento a seguir descreve como implementar a priority_buffer classe.

Para definir a classe priority_buffer

  1. Crie um arquivo de cabeçalho do C++ e nomeie- priority_buffer.h. Como alternativa, você pode usar um arquivo de cabeçalho existente é parte do seu projeto.

  2. Em priority_buffer.h, adicione o seguinte código.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. No std namespace, definir especializações de std::less e std::greater que atuam em Concurrency::message objetos.

    namespace std 
    {
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
    {  
       typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator< to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) < get<0>(right->payload));
       }
    };
    
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
    {  
       typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator> to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) > get<0>(right->payload));
       }
    };
    
    }
    

    O priority_buffer classe armazena message objetos em um priority_queue objeto. Estas especializações tipo permitem que a fila de prioridade classificar as mensagens de acordo com sua prioridade. A prioridade é o primeiro elemento da tuple objeto.

  4. No Concurrency namespace, declarar a priority_buffer classe.

    namespace Concurrency 
    {
    template<class Type, 
             typename PriorityType = int,
             typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : 
       public propagator_block<multi_link_registry<ITarget<Type>>,
                               multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
    protected:
    private:
    };
    }
    

    O priority_buffer classe deriva de propagator_block. Portanto, ele pode enviar e receber mensagens. O priority_buffer classe pode ter vários destinos que recebem as mensagens do tipo Type. Ele também pode ter várias origens que enviam mensagens do tipo tuple<PriorityType, Type>.

  5. No private seção a priority_buffer classe, adicione as seguintes variáveis de membro.

    // Stores incoming messages. 
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
       message<_Source_type>*, 
       std::vector<message<_Source_type>*>, 
       Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<message<_Target_type>*> _output_messages;
    

    O priority_queue objeto contém mensagens de entrada; o queue objeto contém mensagens de saída. A priority_buffer objeto pode receber várias mensagens simultaneamente; o critical_section objeto sincroniza o acesso à fila de mensagens de entrada.

  6. No private seção, defina o construtor de cópia e o operador de atribuição. Isso impede que priority_queue objetos sejam atribuível.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. No public seção, defina os construtores que são comuns a muitos tipos de bloco de mensagem. Também defina o destruidor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {       
       initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
       initialize_source_and_target();
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler)
    {
       initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler, filter_method const& filter) 
    {
       initialize_source_and_target(&scheduler);
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group)
    {
       initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
    {
       initialize_source_and_target(NULL, &schedule_group);
       register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
       // Remove all links.
       remove_network_links();
    }
    
  8. No public seção, definir os métodos enqueue e dequeue. Esses métodos auxiliares fornecem uma maneira alternativa para enviar mensagens para e receber mensagens de um priority_buffer objeto.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
      return Concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
      return receive<Type>(this);
    }
    
  9. No protected seção, defina a propagate_to_any_targets método.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(message<_Target_type>*)
    {
       // Retrieve the message from the front of the input queue.
       message<_Source_type>* input_message = NULL;
       {
          critical_section::scoped_lock lock(_input_lock);
          if (_input_messages.size() > 0)
          {
             input_message = _input_messages.top();
             _input_messages.pop();
          }
       }
    
       // Move the message to the output queue.
       if (input_message != NULL)
       {
          // The payload of the output message does not contain the 
          // priority of the message.
          message<_Target_type>* output_message = 
             new message<_Target_type>(get<1>(input_message->payload));
          _output_messages.push(output_message);
    
          // Free the memory for the input message.
          delete input_message;
    
          // Do not propagate messages if the new message is not the head message.
          // In this case, the head message is reserved by another message block.
          if (_output_messages.front()->msg_id() != output_message->msg_id())
          {
             return;
          }
       }
    
       // Propagate out the output messages.
       propagate_priority_order();
    }
    

    O propagate_to_any_targets método transfere a mensagem que está na frente da fila de entrada para a fila de saída e propaga todas as mensagens na fila de saída.

  10. No protected seção, defina a accept_message método.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
    {        
       message<_Target_type>* message = NULL;
    
       // Transfer ownership if the provided message identifier matches
       // the identifier of the front of the output message queue.
       if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
       {
          message = _output_messages.front();            
          _output_messages.pop();
       }
    
       return message;
    }
    

    Quando um bloco de destino chama o accept_message método, o priority_buffer propriedade de transferências de classe da mensagem para o primeiro bloco de destino que aceita o proprietário. (Isso lembra o comportamento de unbounded_buffer.)

  11. No protected seção, defina a reserve_message método.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(runtime_object_identity msg_id)
    {
       // Allow the message to be reserved if the provided message identifier
       // is the message identifier of the front of the message queue.
       return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
    }
    

    O priority_buffer classe permite que um bloco de destino para reservar uma mensagem quando o identificador de mensagem fornecida com o identificador da mensagem que está na frente da fila. Em outras palavras, um destino pode reservar a mensagem se a priority_buffer objeto ainda não recebeu uma mensagem adicionais e ainda não tenha propagada out atual.

  12. No protected seção, defina a consume_message método.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual message<Type>* consume_message(runtime_object_identity msg_id)
    {
       // Transfer ownership of the message to the caller.
       return accept_message(msg_id);
    }
    

    Um bloco de destino chama consume_message para transferir a propriedade da mensagem que ele reservados.

  13. No protected seção, defina a release_message método.

    // Releases a previous message reservation.
    virtual void release_message(runtime_object_identity msg_id)
    {
       // The head message must be the one that is reserved. 
       if (_output_messages.empty() || 
           _output_messages.front()->msg_id() != msg_id)
       {
          throw message_not_found();
       }
    }
    

    Um bloco de destino chama release_message para cancelar sua reserva a mensagem.

  14. No protected seção, defina a resume_propagation método.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
       // Propagate out any messages in the output queue.
       if (_output_messages.size() > 0)
       {
          async_send(NULL);
       }
    }
    

    As chamadas de tempo de execução resume_propagation depois de um bloco de destino consome ou libera uma mensagem reservada. Esse método propaga quaisquer mensagens que estão na fila de saída.

  15. No protected seção, defina a link_target_notification método.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(ITarget<_Target_type>*)
    {
       // Do not propagate messages if a target block reserves
       // the message at the front of the queue.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out any messages that are in the output queue.
       propagate_priority_order();
    }
    

    O _M_pReservedFor a variável de membro é definido pela classe base, source_block. Essa variável de membro aponta para o bloco de destino, se houver, que está mantendo uma reserva para a mensagem que está na frente da fila de saída. As chamadas de tempo de execução link_target_notification quando um novo destino está vinculado a priority_buffer objeto. Esse método propaga quaisquer mensagens que estão na fila de saída, se nenhum destino está mantendo uma reserva.

  16. No private seção, defina a propagate_priority_order método.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
       // Cancel propagation if another block reserves the head message.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out all output messages. 
       // Because this block preserves message ordering, stop propagation
       // if any of the messages are not accepted by a target block.
       while (!_output_messages.empty())
       {
          // Get the next message.
          message<_Target_type> * message = _output_messages.front();
    
          message_status status = declined;
    
          // Traverse each target in the order in which they are connected.
          for (target_iterator iter = _M_connectedTargets.begin(); 
               *iter != NULL; 
               ++iter)
          {
             // Propagate the message to the target.
             ITarget<_Target_type>* target = *iter;
             status = target->propagate(message, this);
    
             // If the target accepts the message then ownership of message has 
             // changed. Do not propagate this message to any other target.
             if (status == accepted)
             {
                break;
             }
    
             // If the target only reserved this message, we must wait until the 
             // target accepts the message.
             if (_M_pReservedFor != NULL)
             {
                break;
             }
          }
    
          // If status is anything other than accepted, then the head message
          // was not propagated out. To preserve the order in which output 
          // messages are propagated, we must stop propagation until the head 
          // message is accepted.
          if (status != accepted)
          {
              break;
          }          
       }
    }
    

    Esse método se propaga para fora de todas as mensagens da fila de saída. Cada mensagem na fila é oferecida para cada bloco de destino até que um dos blocos de destino aceita a mensagem. O priority_buffer classe preserva a ordem das mensagens de saída. Portanto, a primeira mensagem na fila de saída deve ser aceito por um bloco de destino antes deste método oferece a qualquer outra mensagem para os blocos de destino.

  17. No protected seção, defina a propagate_message método.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual message_status propagate_message(message<_Source_type>* message, 
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Asynchronously send the message to the target blocks.
          async_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    O propagate_message permite que o método de priority_buffer classe para atuar como um receptor da mensagem, ou de destino. Esse método recebe a mensagem que é oferecida pelo bloco de origem fornecido e insere a mensagem na fila de prioridade. O propagate_message , em seguida, o método assincronamente envia todas as mensagens de saída para blocos de destino.

    O tempo de execução chama esse método quando você chamar o Concurrency::asend de função ou quando o bloco de mensagens está conectado a outros blocos de mensagem.

  18. No protected seção, defina a send_message método.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual message_status send_message(message<_Source_type>* message,
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Synchronously send the message to the target blocks.
          sync_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    O send_message método semelhante ao propagate_message. No entanto, ele envia as mensagens de saída síncrona em vez de forma assíncrona.

    O tempo de execução chama esse método durante uma operação de envio síncrono, como, por exemplo, quando você chama o Concurrency::send função.

O priority_buffer classe contém sobrecargas de construtor, que são comuns em muitos tipos de bloco de mensagem. Alguns construtor sobrecargas take Concurrency::Scheduler ou Concurrency::ScheduleGroup objetos, que permitem que o bloco de mensagem a ser gerenciado por um Agendador de tarefas específicas. Outras sobrecargas do construtor levam a uma função de filtro. As funções de filtro permitem blocos de mensagem aceitar ou rejeitar uma mensagem na Base de sua carga. Para obter mais informações sobre filtros de mensagens, consulte Blocos de mensagens assíncronas. Para obter mais informações sobre os agendadores de tarefa, consulte Agendador de tarefas (Runtime de simultaneidade).

Porque o priority_buffer classe orders mensagens por prioridade e, em seguida, pela ordem na qual as mensagens são recebidas, essa classe é mais útil quando ele recebe mensagens de forma assíncrona, por exemplo, quando você chamar o Concurrency::asend de função ou quando o bloco de mensagens está conectado a outros blocos de mensagem.

go to top

O exemplo completo

O exemplo a seguir mostra a definição completa do priority_buffer classe.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
// A specialization of less that tests whether the priority element of a 
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
{  
   typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator< to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) < get<0>(right->payload));
   }
};

// A specialization of less that tests whether the priority element of a 
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
{  
   typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator> to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) > get<0>(right->payload));
   }
};

}

namespace Concurrency 
{
// A message block type that orders incoming messages first by priority, 
// and then by the order in which messages are received. 
template<class Type, 
         typename PriorityType = int,
         typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : 
   public propagator_block<multi_link_registry<ITarget<Type>>,
                           multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{  
public:
   // Constructs a priority_buffer message block.
   priority_buffer() 
   {       
      initialize_source_and_target();
   }

   // Constructs a priority_buffer message block with the given filter function.
   priority_buffer(filter_method const& filter)
   {
      initialize_source_and_target();
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler)
   {
      initialize_source_and_target(&scheduler);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler, filter_method const& filter) 
   {
      initialize_source_and_target(&scheduler);
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group)
   {
      initialize_source_and_target(NULL, &schedule_group);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
   {
      initialize_source_and_target(NULL, &schedule_group);
      register_filter(filter);
   }

   // Destroys the message block.
   ~priority_buffer()
   {
      // Remove all links.
      remove_network_links();
   }

   // Sends an item to the message block.
   bool enqueue(Type const& item)
   {
     return Concurrency::asend<Type>(this, item);
   }

   // Receives an item from the message block.
   Type dequeue()
   {
     return receive<Type>(this);
   }

protected:
   // Asynchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::propagate.
   virtual message_status propagate_message(message<_Source_type>* message, 
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Asynchronously send the message to the target blocks.
         async_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Synchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::send.
   virtual message_status send_message(message<_Source_type>* message,
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Synchronously send the message to the target blocks.
         sync_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Accepts a message that was offered by this block by transferring ownership
   // to the caller.
   virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
   {        
      message<_Target_type>* message = NULL;

      // Transfer ownership if the provided message identifier matches
      // the identifier of the front of the output message queue.
      if (!_output_messages.empty() && 
           _output_messages.front()->msg_id() == msg_id)
      {
         message = _output_messages.front();            
         _output_messages.pop();
      }

      return message;
   }

   // Reserves a message that was previously offered by this block.
   virtual bool reserve_message(runtime_object_identity msg_id)
   {
      // Allow the message to be reserved if the provided message identifier
      // is the message identifier of the front of the message queue.
      return (!_output_messages.empty() && 
               _output_messages.front()->msg_id() == msg_id);
   }

   // Transfers the message that was previously offered by this block 
   // to the caller. The caller of this method is the target block that 
   // reserved the message.
   virtual message<Type>* consume_message(runtime_object_identity msg_id)
   {
      // Transfer ownership of the message to the caller.
      return accept_message(msg_id);
   }

   // Releases a previous message reservation.
   virtual void release_message(runtime_object_identity msg_id)
   {
      // The head message must be the one that is reserved. 
      if (_output_messages.empty() || 
          _output_messages.front()->msg_id() != msg_id)
      {
         throw message_not_found();
      }
   }

   // Resumes propagation after a reservation has been released.
   virtual void resume_propagation()
   {
      // Propagate out any messages in the output queue.
      if (_output_messages.size() > 0)
      {
         async_send(NULL);
      }
   }

   // Notifies this block that a new target has been linked to it.
   virtual void link_target_notification(ITarget<_Target_type>*)
   {
      // Do not propagate messages if a target block reserves
      // the message at the front of the queue.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out any messages that are in the output queue.
      propagate_priority_order();
   }

   // Transfers the message at the front of the input queue to the output queue
   // and propagates out all messages in the output queue.
   virtual void propagate_to_any_targets(message<_Target_type>*)
   {
      // Retrieve the message from the front of the input queue.
      message<_Source_type>* input_message = NULL;
      {
         critical_section::scoped_lock lock(_input_lock);
         if (_input_messages.size() > 0)
         {
            input_message = _input_messages.top();
            _input_messages.pop();
         }
      }

      // Move the message to the output queue.
      if (input_message != NULL)
      {
         // The payload of the output message does not contain the 
         // priority of the message.
         message<_Target_type>* output_message = 
            new message<_Target_type>(get<1>(input_message->payload));
         _output_messages.push(output_message);

         // Free the memory for the input message.
         delete input_message;

         // Do not propagate messages if the new message is not the head message.
         // In this case, the head message is reserved by another message block.
         if (_output_messages.front()->msg_id() != output_message->msg_id())
         {
            return;
         }
      }

      // Propagate out the output messages.
      propagate_priority_order();
   }

private:

   // Propagates messages in priority order.
   void propagate_priority_order()
   {
      // Cancel propagation if another block reserves the head message.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out all output messages. 
      // Because this block preserves message ordering, stop propagation
      // if any of the messages are not accepted by a target block.
      while (!_output_messages.empty())
      {
         // Get the next message.
         message<_Target_type> * message = _output_messages.front();

         message_status status = declined;

         // Traverse each target in the order in which they are connected.
         for (target_iterator iter = _M_connectedTargets.begin(); 
              *iter != NULL; 
              ++iter)
         {
            // Propagate the message to the target.
            ITarget<_Target_type>* target = *iter;
            status = target->propagate(message, this);

            // If the target accepts the message then ownership of message has 
            // changed. Do not propagate this message to any other target.
            if (status == accepted)
            {
               break;
            }

            // If the target only reserved this message, we must wait until the 
            // target accepts the message.
            if (_M_pReservedFor != NULL)
            {
               break;
            }
         }

         // If status is anything other than accepted, then the head message
         // was not propagated out. To preserve the order in which output 
         // messages are propagated, we must stop propagation until the head 
         // message is accepted.
         if (status != accepted)
         {
             break;
         }          
      }
   }

private:

   // Stores incoming messages. 
   // The type parameter Pr specifies how to order messages by priority.
   std::priority_queue<
      message<_Source_type>*, 
      std::vector<message<_Source_type>*>, 
      Pr
   > _input_messages;

   // Synchronizes access to the input message queue.
   critical_section _input_lock;

   // Stores outgoing messages.
   std::queue<message<_Target_type>*> _output_messages;

private:
   // Hide assignment operator and copy constructor.
   priority_buffer const &operator =(priority_buffer const&);
   priority_buffer(priority_buffer const &);
};

}

O exemplo a seguir simultaneamente realiza uma série de asend e Concurrency::receive operações em um priority_buffer objeto.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Este exemplo produz a saída de exemplo a seguir.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

O priority_buffer classe orders mensagens primeiro pela prioridade e, em seguida, pela ordem em que ele recebe mensagens. Neste exemplo, as mensagens com maior prioridade numérica são inseridas em direção à frente da fila.

go to top

Compilando o código

Copie o código de exemplo e colá-lo em um Visual Studio de projeto ou cole a definição da priority_buffer classe em um arquivo que é chamado priority_buffer.h e o programa de teste em um arquivo chamado priority_buffer.cpp e, em seguida, execute o seguinte comando um Visual Studio 2010 janela do Prompt de comando.

cl.exe /EHsc priority_buffer.cpp

Consulte também

Conceitos

Orientações de Runtime de simultaneidade

Blocos de mensagens assíncronas

Funções de transmissão de mensagens