Návod: Vytvoření vlastního bloku zpráv
Tento dokument popisuje, jak vytvořit vlastní typ bloku zpráv, který bude řadit příchozí zprávy podle priority.
I když integrované typy bloků zpráv poskytují širokou škálu funkcí, můžete vytvořit vlastní typ bloku zpráv a přizpůsobit ho tak, aby splňoval požadavky vaší aplikace. Popis předdefinovaných typů bloků zpráv, které poskytuje knihovna asynchronních agentů, naleznete v tématu Asynchronní bloky zpráv.
Požadavky
Před zahájením tohoto návodu si přečtěte následující dokumenty:
Oddíly
Tento názorný postup obsahuje následující části:
Návrh vlastního bloku zpráv
Bloky zpráv se účastní odesílání a přijímání zpráv. Blok zpráv, který odesílá zprávy, se označuje jako zdrojový blok. Blok zpráv, který přijímá zprávy, se označuje jako cílový blok. Blok zpráv, který odesílá i přijímá zprávy, se označuje jako blok šíření. Knihovna agentů používá abstraktní třídu concurrency::ISource k reprezentaci zdrojových bloků a abstraktní třídy concurrency::ITarget k reprezentaci cílových bloků. Typy bloku zpráv, které fungují jako zdroje, odvozeny od ISource
; typy bloku zpráv, které fungují jako cíle odvozeny od ITarget
.
I když můžete typ bloku zprávy odvodit přímo z ISource
a ITarget
, Knihovna Agents definuje tři základní třídy, které provádějí většinu funkcí, které jsou společné pro všechny typy bloků zpráv, například zpracování chyb a propojení bloků zpráv způsobem bezpečným způsobem souběžnosti. Concurrency ::source_block třída je odvozena a ISource
odesílá zprávy do jiných bloků. Concurrency ::target_block třída je odvozena a ITarget
přijímá zprávy z jiných bloků. Concurrency::p ropagator_block třída je odvozena a ISource
ITarget
odesílá zprávy do jiných bloků a přijímá zprávy z jiných bloků. Tyto tři základní třídy doporučujeme použít ke zpracování podrobností o infrastruktuře, abyste se mohli zaměřit na chování bloku zpráv.
source_block
, target_block
a propagator_block
třídy jsou šablony, které jsou parametrizovány u typu, který spravuje připojení nebo propojení mezi zdrojovými a cílovými bloky a typem, který spravuje způsob zpracování zpráv. Knihovna agentů definuje dva typy, které provádějí správu propojení, souběžnost::single_link_registry a souběžnost::multi_link_registry. Třída single_link_registry
umožňuje propojení bloku zpráv s jedním zdrojem nebo s jedním cílem. Třída multi_link_registry
umožňuje propojení bloku zpráv s více zdroji nebo několika cíli. Knihovna agentů definuje jednu třídu, která provádí správu zpráv, souběžnost::ordered_message_processor. Třída ordered_message_processor
umožňuje blokům zpráv zpracovávat zprávy v pořadí, ve kterém je přijímá.
Pokud chcete lépe pochopit, jak bloky zpráv souvisejí s jejich zdroji a cíli, zvažte následující příklad. Tento příklad ukazuje deklaraci třídy concurrency::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Třída transformer
je odvozena z propagator_block
, a proto funguje jako zdrojový blok i jako cílový blok. Přijímá zprávy typu _Input
a odesílá zprávy typu _Output
. Třída transformer
určuje single_link_registry
jako správce propojení pro všechny cílové bloky a multi_link_registry
jako správce propojení pro všechny zdrojové bloky. transformer
Objekt proto může mít až jeden cíl a neomezený počet zdrojů.
Třída odvozená od source_block
musí implementovat šest metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message a resume_propagation. Třída, která je odvozena od target_block
musí implementovat propagate_message metoda a může volitelně implementovat send_message metoda. Odvození z propagator_block
funkce je funkčně ekvivalentní odvození z obou source_block
a target_block
.
Metoda propagate_to_any_targets
je volána modulem runtime pro asynchronní nebo synchronní zpracování všech příchozích zpráv a šíření odchozích zpráv. Metoda accept_message
je volána cílovými bloky pro příjem zpráv. Mnoho typů bloků zpráv, například unbounded_buffer
, odesílat zprávy pouze do prvního cíle, který by ho obdržel. Proto přenese vlastnictví zprávy do cíle. Další typy bloků zpráv, jako je souběžnost::overwrite_buffer, nabízejí zprávy každému z cílových bloků. overwrite_buffer
Proto vytvoří kopii zprávy pro každý z jeho cílů.
Metody reserve_message
, consume_message
a release_message
resume_propagation
metody umožňují, aby se bloky zpráv mohly účastnit rezervace zpráv. Cílové bloky volají metodu reserve_message
, když se jim zobrazí zpráva, a musí si ji rezervovat pro pozdější použití. Jakmile cílový blok rezervuje zprávu, může zavolat metodu consume_message
, která tuto zprávu spotřebuje, nebo metodu release_message
zrušení rezervace. Stejně jako u accept_message
této metody může implementace consume_message
buď převést vlastnictví zprávy, nebo vrátit kopii zprávy. Jakmile cílový blok buď spotřebuje nebo uvolní rezervovanou zprávu, modul runtime volá metodu resume_propagation
. Tato metoda obvykle pokračuje v šíření zpráv, počínaje další zprávou ve frontě.
Modul runtime volá metodu propagate_message
, která asynchronně přenese zprávu z jiného bloku do aktuálního bloku. Metoda send_message
se podobá propagate_message
, s tím rozdílem, že synchronně místo asynchronně odešle zprávu do cílových bloků. Výchozí implementace send_message
odmítne všechny příchozí zprávy. Modul runtime nevolá některou z těchto metod, pokud zpráva nepředá volitelnou funkci filtru přidruženou k cílovému bloku. Další informace ofiltrch
[Nahoře]
Definování třídy priority_buffer
Třída priority_buffer
je vlastní typ bloku zprávy, který se bude řadit jako první příchozí zprávy podle priority a potom podle pořadí, ve kterém se zprávy přijímají. Třída priority_buffer
se podobá souběžnosti::unbounded_buffer třída, protože obsahuje frontu zpráv, a také proto, že funguje jako zdroj i cílový blok zpráv a může mít více zdrojů i více cílů. Šíření unbounded_buffer
zpráv se však zakládá pouze na pořadí, ve kterém přijímá zprávy ze svých zdrojů.
Třída priority_buffer
přijímá zprávy typu std::tuple , které obsahují PriorityType
a Type
prvky. PriorityType
odkazuje na typ, který obsahuje prioritu každé zprávy; Type
odkazuje na datovou část zprávy. Třída priority_buffer
odesílá zprávy typu Type
. Třída priority_buffer
také spravuje dvě fronty zpráv: std::p riority_queue objekt pro příchozí zprávy a objekt std::queue pro odchozí zprávy. Řazení zpráv podle priority je užitečné, když priority_buffer
objekt přijímá více zpráv současně nebo když přijímá více zpráv před čtením všech zpráv příjemci.
Kromě sedmi metod, které třída odvozená z propagator_block
musí implementovat, priority_buffer
třída také přepíše link_target_notification
a send_message
metody. Třída priority_buffer
také definuje dvě veřejné pomocné metody enqueue
, a dequeue
soukromé pomocné metody, propagate_priority_order
.
Následující postup popisuje, jak implementovat priority_buffer
třídu.
Definice třídy priority_buffer
Vytvořte soubor hlaviček jazyka C++ a pojmenujte ho
priority_buffer.h
. Alternativně můžete použít existující hlavičkový soubor, který je součástí projektu.Do
priority_buffer.h
pole přidejte následující kód.#pragma once #include <agents.h> #include <queue>
std
V oboru názvů definujte specializace std::less a std::greater, které se chovají na objekty concurrency::message.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Třída
priority_buffer
ukládámessage
objekty do objektupriority_queue
. Tyto specializace typu umožňují frontě priority řadit zprávy podle jejich priority. Priorita je první prvek objektutuple
.concurrencyex
V oboru názvů deklarujtepriority_buffer
třídu.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Třída
priority_buffer
je odvozena zpropagator_block
. Proto může odesílat i přijímat zprávy. Třídapriority_buffer
může mít více cílů, které přijímají zprávy typuType
. Může mít také více zdrojů, které odesílají zprávy typutuple<PriorityType, Type>
.private
V částipriority_buffer
třídy přidejte následující členské proměnné.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
Objekt uchovává příchozí zprávy.
queue
Objektpriority_queue
uchovává odchozí zprávy.priority_buffer
Objekt může přijímat více zpráv současně;critical_section
objekt synchronizuje přístup do fronty vstupních zpráv.private
V části definujte konstruktor kopírování a operátor přiřazení. Tím zabránítepriority_queue
přiřazení objektů.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public
V části definujte konstruktory, které jsou společné pro mnoho typů bloků zpráv. Destruktor destruktor také destruktor destruktor definuje.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public
V části definujte metodyenqueue
adequeue
. Tyto pomocné metody poskytují alternativní způsob, jak odesílat zprávy a přijímat zprávy z objektupriority_buffer
.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected
V části definujte metodupropagate_to_any_targets
.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Metoda
propagate_to_any_targets
přenese zprávu, která je před vstupní frontou, do výstupní fronty a rozšíří všechny zprávy ve výstupní frontě.protected
V části definujte metoduaccept_message
.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Když cílový blok volá metodu
accept_message
,priority_buffer
třída přenese vlastnictví zprávy do prvního cílového bloku, který ji přijme. (Podobá se chováníunbounded_buffer
.)protected
V části definujte metodureserve_message
.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
Třída
priority_buffer
umožňuje cílovému bloku rezervovat zprávu, pokud zadaný identifikátor zprávy odpovídá identifikátoru zprávy, která je na přední straně fronty. Jinými slovy, cíl si může zprávu rezervovat, pokudpriority_buffer
objekt ještě nepřijal další zprávu a ještě nebylo rozšířeno o aktuální zprávu.protected
V části definujte metoduconsume_message
.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Cílové blokové volání
consume_message
pro přenos vlastnictví zprávy, kterou si rezervovala.protected
V části definujte metodurelease_message
.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Cílová bloková volání
release_message
, která zruší rezervaci zprávy.protected
V části definujte metoduresume_propagation
.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Volání modulu runtime
resume_propagation
po cílovém bloku buď spotřebuje, nebo uvolní rezervovanou zprávu. Tato metoda rozšíří všechny zprávy, které jsou ve výstupní frontě.protected
V části definujte metodulink_target_notification
.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
Proměnná
_M_pReservedFor
člena je definována základní třídou ,source_block
. Tato proměnná člena odkazuje na cílový blok, pokud existuje, který má rezervaci na zprávu, která je na přední straně výstupní fronty. Modul runtime volálink_target_notification
, když je nový cíl propojený s objektempriority_buffer
. Tato metoda rozšíří všechny zprávy, které jsou ve výstupní frontě, pokud žádný cíl neudržuje rezervaci.private
V části definujte metodupropagate_priority_order
.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Tato metoda rozšíří všechny zprávy z výstupní fronty. Každá zpráva ve frontě se nabízí každému cílovému bloku, dokud jeden z cílových bloků zprávu nepřijímá. Třída
priority_buffer
zachovává pořadí odchozích zpráv. Proto musí být první zpráva ve výstupní frontě přijata cílovým blokem předtím, než tato metoda nabídne jakékoli další zprávy cílovým blokům.protected
V části definujte metodupropagate_message
.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Metoda
propagate_message
umožňujepriority_buffer
, aby třída fungovala jako příjemce zprávy nebo cíl. Tato metoda obdrží zprávu, která je nabízena zadaným zdrojovým blokem, a vloží tuto zprávu do fronty priority. Metodapropagate_message
pak asynchronně odešle všechny výstupní zprávy do cílových bloků.Modul runtime volá tuto metodu při volání funkce concurrency::asend nebo při připojení bloku zprávy k jiným blokům zpráv.
protected
V části definujte metodusend_message
.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Metoda
send_message
se podobápropagate_message
. Ale odesílá výstupní zprávy synchronně místo asynchronně.Modul runtime volá tuto metodu během synchronní operace odesílání, například při volání funkce concurrency::send .
Třída priority_buffer
obsahuje přetížení konstruktoru, které jsou typické v mnoha typech bloků zpráv. Některá přetížení konstruktoru mají souběžnost::Scheduler nebo concurrency::ScheduleGroup objekty, které umožňují správu bloku zpráv konkrétním plánovačem úloh. Jiné přetížení konstruktoru přebírají funkci filtru. Funkce filtrování umožňují blokům zpráv přijmout nebo odmítnout zprávu na základě datové části. Další informace ofiltrch Další informace o plánovači úloh naleznete v tématu Plánovač úloh.
Vzhledem k tomu, že priority_buffer
třída objednává zprávy podle priority a pořadí, ve kterém jsou zprávy přijaty, je tato třída nejužitečnější, když přijímá zprávy asynchronně, například při volání souběžnosti::asend nebo když je blok zpráv připojený k jiným blokům zpráv.
[Nahoře]
Kompletní příklad
Následující příklad ukazuje úplnou definici priority_buffer
třídy.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Následující příklad souběžně provádí řadu asend
operací a souběžnost::receive operace u objektu priority_buffer
.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Tento příklad vytvoří následující ukázkový výstup.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Třída priority_buffer
objednává zprávy jako první podle priority a potom podle pořadí, ve kterém přijímá zprávy. V tomto příkladu se zprávy s větší číselnou prioritou vloží do přední části fronty.
[Nahoře]
Probíhá kompilace kódu
Zkopírujte ukázkový kód a vložte ho do projektu sady Visual Studio nebo vložte definici priority_buffer
třídy do souboru s názvem priority_buffer.h
a testovacím programem v souboru s názvem priority_buffer.cpp
a potom v okně příkazového řádku sady Visual Studio spusťte následující příkaz.
cl.exe /EHsc priority_buffer.cpp
Viz také
Návody pro Concurrency Runtime
Asynchronní bloky zpráv
Funkce pro předávání zpráv