Procédure pas à pas : création d'un bloc de message personnalisé

Ce document explique comment créer un type de bloc de messages personnalisé qui trie les messages entrants par priorité.

Bien que les types de blocs de messages intégrés fournissent un large éventail de fonctionnalités, vous pouvez créer votre propre type de bloc de message et le personnaliser pour répondre aux exigences de votre application. Pour obtenir une description des types de blocs de messages intégrés fournis par la bibliothèque d’agents asynchrones, consultez blocs de messages asynchrones.

Prérequis

Lisez les documents suivants avant de commencer cette procédure pas à pas :

Sections

Cette procédure pas à pas contient les sections suivantes :

Conception d’un bloc de messages personnalisé

Les blocs de messages participent à l’acte d’envoi et de réception de messages. Un bloc de messages qui envoie des messages est appelé bloc source. Un bloc de messages qui reçoit des messages est appelé bloc cible. Un bloc de messages qui envoie et reçoit des messages est appelé bloc de propagation. La bibliothèque agents utilise la concurrence de classe abstraite ::ISource pour représenter les blocs sources et la concurrence de classe abstraite ::ITarget pour représenter les blocs cibles. Types de blocs de message qui agissent en tant que sources dérivent ISource; les types de blocs de messages qui agissent en tant que cibles dérivent de ITarget.

Bien que vous puissiez dériver directement votre type de ISource bloc de message et ITarget, la bibliothèque d’agents définit trois classes de base qui effectuent une grande partie des fonctionnalités communes à tous les types de blocs de messages, par exemple, la gestion des erreurs et la connexion de blocs de messages ensemble d’une manière sécurisée par accès concurrentiel. La classe concurrency ::source_block dérive et ISource envoie des messages à d’autres blocs. La classe concurrency ::target_block dérive et ITarget reçoit des messages d’autres blocs. La classe concurrency ::p ropagator_block dérive et ISource ITarget envoie des messages à d’autres blocs et reçoit des messages d’autres blocs. Nous vous recommandons d’utiliser ces trois classes de base pour gérer les détails de l’infrastructure afin que vous puissiez vous concentrer sur le comportement de votre bloc de messages.

Les source_blockclasses et propagator_block les modèles target_blocksont des modèles paramétrés sur un type qui gère les connexions ou les liens, entre les blocs source et cible et sur un type qui gère le traitement des messages. La bibliothèque agents définit deux types qui effectuent la gestion des liens, concurrency ::single_link_registry et concurrency ::multi_link_registry. La single_link_registry classe permet à un bloc de message d’être lié à une source ou à une cible. La multi_link_registry classe permet à un bloc de message d’être lié à plusieurs sources ou à plusieurs cibles. La bibliothèque agents définit une classe qui effectue la gestion des messages, concurrency ::ordered_message_processor. La ordered_message_processor classe permet aux blocs de messages de traiter les messages dans l’ordre dans lequel il les reçoit.

Pour mieux comprendre comment les blocs de messages sont liés à leurs sources et cibles, tenez compte de l’exemple suivant. Cet exemple montre la déclaration de la classe concurrency ::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

La transformer classe dérive de propagator_block, et agit donc comme un bloc source et comme un bloc cible. Il accepte les messages de type _Input et envoie des messages de type _Output. La transformer classe spécifie single_link_registry comme gestionnaire de liens pour tous les blocs cibles et multi_link_registry comme gestionnaire de liens pour tous les blocs sources. Par conséquent, un transformer objet peut avoir jusqu’à une cible et un nombre illimité de sources.

Une classe dérivée de source_block doit implémenter six méthodes : propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message et resume_propagation. Une classe dérivée de target_block doit implémenter la méthode propagate_message et peut éventuellement implémenter la méthode send_message . La dérivation est propagator_block fonctionnellement équivalente à la dérivation des deux source_block et target_block.

La propagate_to_any_targets méthode est appelée par le runtime pour traiter de manière asynchrone ou synchrone tous les messages entrants et propager tous les messages sortants. La accept_message méthode est appelée par les blocs cibles pour accepter les messages. De nombreux types de blocs de messages, tels que unbounded_buffer, envoient des messages uniquement à la première cible qui le recevrait. Par conséquent, il transfère la propriété du message à la cible. D’autres types de blocs de messages, tels que concurrency ::overwrite_buffer, offrent des messages à chacun de ses blocs cibles. Par conséquent, overwrite_buffer crée une copie du message pour chacune de ses cibles.

Les reserve_messageméthodes , et les resume_propagation consume_messagerelease_messageméthodes permettent aux blocs de messages de participer à la réservation de messages. Les blocs cibles appellent la reserve_message méthode lorsqu’elles sont proposées à un message et doivent réserver le message pour une utilisation ultérieure. Une fois qu’un bloc cible réserve un message, il peut appeler la consume_message méthode pour consommer ce message ou la release_message méthode pour annuler la réservation. Comme avec la accept_message méthode, l’implémentation de consume_message peut transférer la propriété du message ou retourner une copie du message. Une fois qu’un bloc cible consomme ou libère un message réservé, le runtime appelle la resume_propagation méthode. En règle générale, cette méthode poursuit la propagation des messages, en commençant par le message suivant dans la file d’attente.

Le runtime appelle la propagate_message méthode pour transférer de façon asynchrone un message d’un autre bloc vers celui actuel. La send_message méthode ressemble propagate_message, sauf qu’elle est synchrone, au lieu de manière asynchrone, envoie le message aux blocs cibles. L’implémentation par défaut de send_message rejette tous les messages entrants. Le runtime n’appelle pas l’une de ces méthodes si le message ne transmet pas la fonction de filtre facultative associée au bloc cible. Pour plus d’informations sur les filtres de messages, consultez Blocs de messages asynchrones.

[Haut]

Définition de la classe priority_buffer

La priority_buffer classe est un type de bloc de messages personnalisé qui commande d’abord les messages entrants par priorité, puis par l’ordre dans lequel les messages sont reçus. La priority_buffer classe ressemble à la classe concurrency ::unbounded_buffer , car elle contient une file d’attente de messages, et également parce qu’elle agit à la fois comme une source et un bloc de messages cible et peut avoir à la fois plusieurs sources et plusieurs cibles. Toutefois, unbounded_buffer base la propagation des messages uniquement sur l’ordre dans lequel il reçoit des messages de ses sources.

La priority_buffer classe reçoit les messages de type std ::tuple qui contiennent et contiennent PriorityType des Type éléments. PriorityType fait référence au type qui contient la priorité de chaque message ; Type fait référence à la partie données du message. La priority_buffer classe envoie des messages de type Type. La priority_buffer classe gère également deux files d’attente de messages : un objet std ::p riority_queue pour les messages entrants et un objet std ::queue pour les messages sortants. L’ordre des messages par priorité est utile lorsqu’un priority_buffer objet reçoit plusieurs messages simultanément ou lorsqu’il reçoit plusieurs messages avant que les messages ne soient lus par les consommateurs.

Outre les sept méthodes qu’une classe dérivée de propagator_block doit implémenter, la priority_buffer classe remplace également les méthodes et send_message les link_target_notification méthodes. La priority_buffer classe définit également deux méthodes d’assistance publique et dequeueenqueue une méthode d’assistance privée. propagate_priority_order

La procédure suivante décrit comment implémenter la priority_buffer classe.

Pour définir la classe priority_buffer

  1. Créez un fichier d’en-tête C++ et nommez-le priority_buffer.h. Vous pouvez également utiliser un fichier d’en-tête existant qui fait partie de votre projet.

  2. Dans priority_buffer.h, ajoutez le code suivant.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. Dans l’espace std de noms, définissez des spécialisations de std ::less et std ::greater qui agissent sur les objets concurrency ::message .

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    La priority_buffer classe stocke des message objets dans un priority_queue objet. Ces spécialisations de type permettent à la file d’attente de priorité de trier les messages en fonction de leur priorité. La priorité est le premier élément de l’objet tuple .

  4. Dans l’espace concurrencyex de noms, déclarez la priority_buffer classe.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    La classe priority_buffer est dérivée de propagator_block. Par conséquent, il peut envoyer et recevoir des messages. La priority_buffer classe peut avoir plusieurs cibles qui reçoivent des messages de type Type. Il peut également avoir plusieurs sources qui envoient des messages de type tuple<PriorityType, Type>.

  5. Dans la private section de la priority_buffer classe, ajoutez les variables membres suivantes.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    L’objet priority_queue contient des messages entrants ; l’objet queue contient des messages sortants. Un priority_buffer objet peut recevoir plusieurs messages simultanément ; l’objet critical_section synchronise l’accès à la file d’attente des messages d’entrée.

  6. Dans la private section, définissez le constructeur de copie et l’opérateur d’affectation. Cela empêche les priority_queue objets d’être assignables.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. Dans la public section, définissez les constructeurs qui sont communs à de nombreux types de blocs de messages. Définissez également le destructeur.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. Dans la public section, définissez les méthodes enqueue et dequeue. Ces méthodes d’assistance offrent une autre façon d’envoyer des messages à un priority_buffer objet et de les recevoir.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. Dans la protected section, définissez la propagate_to_any_targets méthode.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    La propagate_to_any_targets méthode transfère le message qui se trouve à l’avant de la file d’attente d’entrée vers la file d’attente de sortie et propage tous les messages dans la file d’attente de sortie.

  10. Dans la protected section, définissez la accept_message méthode.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Lorsqu’un bloc cible appelle la accept_message méthode, la priority_buffer classe transfère la propriété du message au premier bloc cible qui l’accepte. (Cela ressemble au comportement de unbounded_buffer.)

  11. Dans la protected section, définissez la reserve_message méthode.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    La priority_buffer classe permet à un bloc cible de réserver un message lorsque l’identificateur de message fourni correspond à l’identificateur du message qui se trouve à l’avant de la file d’attente. En d’autres termes, une cible peut réserver le message si l’objet priority_buffer n’a pas encore reçu de message supplémentaire et n’a pas encore propagé le message actuel.

  12. Dans la protected section, définissez la consume_message méthode.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Un bloc cible appelle consume_message pour transférer la propriété du message qu’il a réservé.

  13. Dans la protected section, définissez la release_message méthode.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Un bloc cible appelle release_message pour annuler sa réservation à un message.

  14. Dans la protected section, définissez la resume_propagation méthode.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Le runtime appelle resume_propagation une fois qu’un bloc cible consomme ou libère un message réservé. Cette méthode propage tous les messages qui se trouvent dans la file d’attente de sortie.

  15. Dans la protected section, définissez la link_target_notification méthode.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    La _M_pReservedFor variable membre est définie par la classe de base. source_block Cette variable membre pointe vers le bloc cible, le cas échéant, qui contient une réservation au message qui se trouve à l’avant de la file d’attente de sortie. Le runtime appelle link_target_notification lorsqu’une nouvelle cible est liée à l’objet priority_buffer . Cette méthode propage tous les messages qui se trouvent dans la file d’attente de sortie si aucune cible ne contient de réservation.

  16. Dans la private section, définissez la propagate_priority_order méthode.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Cette méthode propage tous les messages de la file d’attente de sortie. Chaque message de la file d’attente est proposé à chaque bloc cible jusqu’à ce qu’un des blocs cibles accepte le message. La priority_buffer classe conserve l’ordre des messages sortants. Par conséquent, le premier message de la file d’attente de sortie doit être accepté par un bloc cible avant que cette méthode n’offre tout autre message aux blocs cibles.

  17. Dans la protected section, définissez la propagate_message méthode.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    La propagate_message méthode permet à la priority_buffer classe d’agir en tant que récepteur de messages ou cible. Cette méthode reçoit le message proposé par le bloc source fourni et insère ce message dans la file d’attente prioritaire. La propagate_message méthode envoie ensuite de manière asynchrone tous les messages de sortie aux blocs cibles.

    Le runtime appelle cette méthode lorsque vous appelez la fonction concurrency ::asend ou lorsque le bloc de messages est connecté à d’autres blocs de messages.

  18. Dans la protected section, définissez la send_message méthode.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    La send_message méthode ressemble propagate_messageà . Toutefois, il envoie les messages de sortie de manière synchrone au lieu de manière asynchrone.

    Le runtime appelle cette méthode pendant une opération d’envoi synchrone, par exemple lorsque vous appelez la fonction concurrency ::send .

La priority_buffer classe contient des surcharges de constructeur qui sont typiques de nombreux types de blocs de messages. Certaines surcharges de constructeur prennent l’accès concurrentiel ::Scheduler ou les objets concurrency ::ScheduleGroup, ce qui permet au bloc de message d’être géré par un planificateur de tâches spécifique. D’autres surcharges de constructeur prennent une fonction de filtre. Les fonctions de filtre permettent aux blocs de messages d’accepter ou de rejeter un message en fonction de sa charge utile. Pour plus d’informations sur les filtres de messages, consultez Blocs de messages asynchrones. Pour plus d’informations sur les planificateurs de tâches, consultez Planificateur de tâches.

Étant donné que la priority_buffer classe trie les messages par priorité, puis par l’ordre dans lequel les messages sont reçus, cette classe est la plus utile lorsqu’elle reçoit des messages de manière asynchrone, par exemple lorsque vous appelez la fonction concurrency ::asend ou lorsque le bloc de messages est connecté à d’autres blocs de messages.

[Haut]

Exemple complet

L’exemple suivant montre la définition complète de la priority_buffer classe.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

L’exemple suivant effectue simultanément un certain nombre d’opérations asend concurrency ::receive sur un priority_buffer objet.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Cet exemple génère l’exemple de sortie suivant.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

La priority_buffer classe commande d’abord les messages par priorité, puis par l’ordre dans lequel il reçoit des messages. Dans cet exemple, les messages avec une priorité numérique supérieure sont insérés vers le devant de la file d’attente.

[Haut]

Compilation du code

Copiez l’exemple de code et collez-le dans un projet Visual Studio, ou collez la définition de la priority_buffer classe dans un fichier nommé priority_buffer.h et le programme de test dans un fichier nommé priority_buffer.cpp , puis exécutez la commande suivante dans une fenêtre d’invite de commandes Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Voir aussi

Procédures pas à pas relatives au runtime d’accès concurrentiel
Blocs de messages asynchrones
Fonctions de passage de messages