Exemplarische Vorgehensweise: Erstellen eines benutzerdefinierten Nachrichtenblocks
In diesem Dokument wird beschrieben, wie ein benutzerdefinierter Nachrichtenblocktyp erstellt wird, um eingehende Nachrichten nach Priorität zu sortieren.
Obwohl die integrierten Nachrichtenblocktypen eine breite Palette von Funktionen bereitstellen, können Sie auch eigene Nachrichtenblocktypen erstellen und anpassen, um die Anforderungen Ihrer Anwendung zu erfüllen. Eine Beschreibung der integrierten Nachrichtenblocktypen, die von der Asynchronous Agents Library bereitgestellt werden, finden Sie unter Asynchrone Nachrichtenblöcke.
Vorbereitungsmaßnahmen
Lesen Sie die folgenden Dokumente, bevor Sie mit dieser exemplarischen Vorgehensweise beginnen:
Abschnitte
Diese exemplarische Vorgehensweise enthält folgende Abschnitte:
Entwerfen eines benutzerdefinierten Nachrichtenblocks
Definieren der priority_buffer-Klasse
Vollständiges Beispiel
Entwerfen eines benutzerdefinierten Nachrichtenblocks
Nachrichtenblöcke sind am Senden und Empfangen von Nachrichten beteiligt. Ein Nachrichtenblock, der Nachrichten sendet, wird als Quellblock bezeichnet. Ein Nachrichtenblock, der Nachrichten empfängt, wird als Zielblock bezeichnet. Ein Nachrichtenblock, der Nachrichten sendet und empfängt, wird als Weitergabeblock bezeichnet. Die Agents Library verwendet die abstrakte Concurrency::ISource-Klasse, um Quellblöcke darzustellen, und die abstrakte Concurrency::ITarget-Klasse, um Zielblöcke darzustellen. Nachrichtenblocktypen, die als Quelle dienen, werden von der ISource-Klasse abgeleitet, während Nachrichtenblocktypen, die als Ziel dienen, von der ITarget-Klasse abgeleitet werden.
Der Nachrichtenblocktyp kann prinzipiell unmittelbar von ISource und ITarget abgeleitet werden. Die Agents Library definiert jedoch drei Basisklassen, deren Funktionalität weitestgehend der aller Nachrichtenblocktypen entspricht. Beispiel: parallelitätssicheres Behandeln von Fehlern und parallelitätssicheres Verbinden von Nachrichtenblöcken. Die Concurrency::source_block-Klasse wird von ISource abgeleitet und sendet Nachrichten an andere Blöcke. Die Concurrency::target_block-Klasse wird von ITarget abgeleitet und empfängt Nachrichten von anderen Blöcken. Die Concurrency::propagator_block-Klasse wird von ISource und ITarget abgeleitet. Sie sendet Nachrichten an andere Blöcke und empfängt Nachrichten von anderen Blöcken. Es wird empfohlen, Infrastrukturdetails mit diesen drei Basisklassen zu behandeln, sodass Sie sich auf das Verhalten des Nachrichtenblocks konzentrieren können.
Die Klassen source_block, target_block und propagator_block sind Vorlagen, die auf der Grundlage eines Typs parametrisiert werden, der die Verbindungen oder Links zwischen Quell- und Zielblöcken verwaltet, sowie auf Grundlage eines Typs, der die Verarbeitung von Nachrichten verwaltet. Die Agents Library definiert zwei Typen für die Linkverwaltung: Concurrency::single_link_registry und Concurrency::multi_link_registry. Die single_link_registry-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit einer Quelle oder einem Ziel. Die multi_link_registry-Klasse ermöglicht das Verknüpfen eines Nachrichtenblocks mit mehreren Quellen oder mehreren Zielen. Die Agents Library definiert eine Klasse zur Verwaltung von Nachrichten: Concurrency::ordered_message_processor. Die ordered_message_processor-Klasse ermöglicht Nachrichtenblöcken die Verarbeitung von Nachrichten in der Reihenfolge ihres Empfangs.
Im folgenden Beispiel wird die Beziehung zwischen Nachrichtenblöcken sowie Quellen und Zielen veranschaulicht. In diesem Beispiel wird die Deklaration der Concurrency::transformer-Klasse veranschaulicht.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Die transformer-Klasse wird von propagator_block abgeleitet und fungiert daher als Quell- sowie als Zielblock. Sie akzeptiert Nachrichten vom Typ _Input und sendet Nachrichten vom Typ _Output. Die transformer-Klasse gibt single_link_registry als Link-Manager für alle Zielblöcke und multi_link_registry als Link-Manager für alle Quellblöcke an. Aus diesem Grund kann ein transformer-Objekt ein Ziel sowie eine unbegrenzte Anzahl von Quellen haben.
Eine Klasse, die von source_block abgleitet wird, muss sechs Methoden implementieren: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message und resume_propagation. Eine Klasse, die von target_block abgeleitet wird, muss die propagate_message-Methode implementieren und kann die send_message-Methode implementieren. Ableitungen von propagator_block sowie von source_block und target_block sind funktional äquivalent.
Die propagate_to_any_targets-Methode wird von der Laufzeit aufgerufen, um alle eingehenden Nachrichten synchron oder asynchron zu verarbeiten und alle ausgehenden Nachrichten weiterzugeben. Die accept_message-Methode wird von Zielblöcken aufgerufen, um Nachrichten zu akzeptieren. Viele Nachrichtenblocktypen wie unbounded_buffer senden Nachrichten nur an das erste Ziel, das diese empfangen würde. Daher wird der Besitz der Nachricht auf das Ziel übertragen. Andere Nachrichtenblocktypen wie Concurrency::overwrite_buffer bieten Nachrichten für alle entsprechenden Zielblöcke an. overwrite_buffer erstellt daher eine Kopie der Nachricht für alle diesbezüglichen Ziele.
Mit den Methoden reserve_message, consume_message, release_message und resume_propagation können Nachrichtenblöcke an der Reservierung von Nachrichten teilnehmen. Zielblöcke rufen die reserve_message-Methode auf, wenn eine Nachricht für sie bereitgestellt wird, die zur späteren Verwendung reserviert werden muss. Nach dem Reservieren einer Nachricht durch den Zielblock kann dieser die consume_message-Methode aufrufen, um die Nachricht zu verarbeiten, oder die release_message-Methode, um die Reservierung abzubrechen. Analog zur accept_message-Methode kann die Implementierung von consume_message den Besitz der Nachricht übertragen oder eine Kopie der Nachricht zurückgeben. Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die resume_propagation-Methode von der Laufzeit aufgerufen. Diese Methode setzt die Nachrichtenweitergabe i. d. R. mit der nächsten Nachricht in der Warteschlange fort.
Die propagate_message-Methode wird von der Laufzeit aufgerufen, um eine Nachricht asynchron von einem anderen Block zum aktuellen zu übertragen. Die send_message-Methode ähnelt der propagate_message-Methode, sendet die Nachrichten im Unterschied zu dieser jedoch synchron an die Zielblöcke. Die Standardimplementierung von send_message weist alle eingehenden Nachrichten zurück. Die Laufzeit ruft keine der Methoden auf, wenn von der Nachricht nicht die optionale Filterfunktion übergeben wird, die dem Zielblock zugeordnet ist. Weitere Informationen zu Nachrichtenfiltern finden Sie unter Asynchrone Nachrichtenblöcke.
[Nach oben]
Definieren der priority_buffer-Klasse
Die priority_buffer-Klasse ist ein benutzerdefinierter Nachrichtenblocktyp, der eingehende Meldungen zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs sortiert. Die priority_buffer-Klasse ähnelt der Concurrency::unbounded_buffer-Klasse, da sie eine Warteschlange mit Nachrichten enthält und sowohl als Quell- als auch als Zielnachrichtenblock fungiert und mehrere Quellen sowie Ziele haben kann. unbounded_buffer legt als Kriterium für die Weitergabe von Nachrichten jedoch nur die Reihenfolge ihres Empfangs aus den Quellen zugrunde.
Die priority_buffer-Klasse empfängt Nachrichten vom Typ std::tuple mit dem PriorityType- und dem Type-Element. PriorityType verweist auf den Typ, der die Priorität einer Nachricht angibt; Type verweist auf den Datenteil der Nachricht. Die priority_buffer-Klasse sendet Nachrichten vom Typ Type. Die priority_buffer-Klasse verwaltet auch zwei Nachrichtenwarteschlangen: ein std::priority_queue-Objekt für eingehende Nachrichten und ein std::queue-Objekt für ausgehende Nachrichten. Das Sortieren von Nachrichten nach der Priorität ist hilfreich, wenn ein priority_buffer-Objekt mehrere Nachrichten gleichzeitig oder bevor diese von Consumern gelesen werden empfängt.
Zusätzlich zu den sieben Methoden, die von einer Klasse implementiert werden müssen, die von propagator_block abgeleitet wird, überschreibt die priority_buffer-Klasse die noch die link_target_notification-Methode und die send_message-Methode. Die priority_buffer-Klasse definiert außerdem zwei öffentliche Hilfsmethoden (, enqueue und dequeue) sowie eine private Hilfsmethode ( propagate_priority_order).
Im folgenden Verfahren wird beschrieben, wie die priority_buffer-Klasse implementiert wird.
So definieren Sie die priority_buffer-Klasse
Erstellen Sie eine Headerdatei in C++ mit dem Namen priority_buffer.h. Sie können auch eine bestehende Headerdatei verwenden, die Teil Ihres Projekts ist.
Fügen Sie den folgenden Code unter priority_buffer.h hinzu.
#pragma once #include <agents.h> #include <queue>
Definieren Sie im std-Namespace die Spezialisierungen von std::less und std::greater für Concurrency::message-Objekte.
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<Concurrency::message<tuple<PriorityType,Type>>*> { typedef Concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<Concurrency::message<tuple<PriorityType, Type>>*> { typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
Die priority_buffer-Klasse speichert message-Objekte in einem priority_queue-Objekt. Mit diesen Typspezialisierungen können die Nachrichten von der Prioritätswarteschlange anhand ihrer Priorität sortiert werden. Die Priorität ist das erste Element des tuple-Objekts.
Deklarieren Sie die priority_buffer-Klasse im Concurrency-Namespace.
namespace Concurrency { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public propagator_block<multi_link_registry<ITarget<Type>>, multi_link_registry<ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
Die priority_buffer-Klasse wird von propagator_block abgeleitet. Sie kann daher Meldungen senden und empfangen. Die priority_buffer-Klasse mehrere Ziele aufweisen, die Nachrichten vom Typ Type empfangen. Sie kann außerdem mehrere Quellen aufweisen, die Nachrichten vom Typ tuple<PriorityType, Type> senden.
Fügen Sie im private-Abschnitt der priority_buffer-Klasse die folgenden Membervariablen hinzu.
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< message<_Source_type>*, std::vector<message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. critical_section _input_lock; // Stores outgoing messages. std::queue<message<_Target_type>*> _output_messages;
Das priority_queue-Objekt enthält eingehende Nachrichten, das queue-Objekt enthält ausgehende Nachrichten. Ein priority_buffer-Objekt kann mehrere Nachrichten gleichzeitig empfangen. Das critical_section-Objekt synchronisiert den Zugriff auf die Warteschlange für eingehende Nachrichten.
Definieren Sie den Kopierkonstruktor und den Zuweisungsoperator im Abschnitt private. Dadurch wird verhindert, dass priority_queue-Objekte zugewiesen werden können.
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
Definieren Sie im public-Abschnitt die Konstruktoren, die in zahlreichen Nachrichtenblocktypen verwendet werden. Definieren Sie auch den Destruktor.
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
Definieren Sie im public-Abschnitt die enqueue-Methode und die dequeue-Methode. Diese Hilfsmethoden bieten eine alternative Möglichkeit, Nachrichten an ein priority_buffer-Objekt zu senden und von diesem zu empfangen.
// Sends an item to the message block. bool enqueue(Type const& item) { return Concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
Definieren Sie die propagate_to_any_targets-Methode im protected-Abschnitt.
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(message<_Target_type>*) { // Retrieve the message from the front of the input queue. message<_Source_type>* input_message = NULL; { critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. message<_Target_type>* output_message = new message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
Die propagate_to_any_targets-Methode überträgt die Nachricht, die sich in der Eingabewarteschlange an erster Stelle befindet, an die Ausgabewarteschlange, und gibt alle Nachrichten an die Ausgabewarteschlange weiter.
Definieren Sie die accept_message-Methode im protected-Abschnitt.
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual message<_Target_type>* accept_message(runtime_object_identity msg_id) { message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
Wenn die accept_message-Methode von einem Zielblock aufgerufen wird, wird der Besitz der Nachricht von der priority_buffer-Klasse auf den ersten Zielblock übertragen, der diesen akzeptiert. (Dieses Verhalten ist vergleichbar mit unbounded_buffer).
Definieren Sie die reserve_message-Methode im protected-Abschnitt.
// Reserves a message that was previously offered by this block. virtual bool reserve_message(runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
Die priority_buffer-Klasse ermöglicht einem Zielblock, eine Nachricht zu reservieren, wenn der Bezeichner der bereitgestellten Nachricht mit dem Bezeichner der Nachricht übereinstimmt, die an erster Position in der Warteschlange steht. Anders ausgedrückt kann die Nachricht von einem Ziel reserviert werden, wenn vom priority_buffer-Objekt noch keine weitere Nachricht empfangen und die aktuelle Nachricht noch nicht weitergegeben wurde.
Definieren Sie die consume_message-Methode im protected-Abschnitt.
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual message<Type>* consume_message(runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
Die consume_message-Methode wird von einem Zielblock aufgerufen, um den Besitz der reservierten Methode zu übertragen.
Definieren Sie die release_message-Methode im protected-Abschnitt.
// Releases a previous message reservation. virtual void release_message(runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
Die release_message-Methode wird von einem Zielblock aufgerufen, um die Reservierung einer Nachricht abzubrechen.
Definieren Sie die resume_propagation-Methode im protected-Abschnitt.
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
Nachdem eine reservierte Nachricht von einem Zielblock verarbeitet oder freigegeben wurde, wird die resume_propagation-Methode von der Laufzeit aufgerufen. Diese Methode gibt alle Nachrichten weiter, die sich in der Ausgabewarteschlange befinden.
Definieren Sie die link_target_notification-Methode im protected-Abschnitt.
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
Die Membervariable _M_pReservedFor wird von der Basisklasse source_block definiert. Diese Membervariable zeigt ggf. auf den Zielblock mit der Reservierung für die Nachricht, die sich an erster Stelle in der Warteschlange befindet. link_target_notification wird von der Laufzeit aufgerufen, wenn ein neues Ziel mit dem priority_buffer-Objekt verknüpft wird. Diese Methode gibt alle Nachrichten in der Ausgabewarteschlange weiter, wenn kein Ziel eine Reservierung aufweist.
Definieren Sie die propagate_priority_order-Methode im private-Abschnitt.
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. message<_Target_type> * message = _output_messages.front(); message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
Diese Methode gibt alle Nachrichten von der Ausgabewarteschlange weiter. Jede Nachricht in der Warteschlange wird für alle Zielblöcke bereitgestellt, bis einer der Zielblöcke die Meldung akzeptiert. Die priority_buffer-Klasse behält die Reihenfolge der ausgehenden Nachrichten bei. Daher muss die erste Nachricht in der Ausgabewarteschlange von einem Zielblock akzeptiert werden, bevor eine andere Meldung von dieser Methode für die Zielblöcke bereitgestellt wird.
Definieren Sie die propagate_message-Methode im protected-Abschnitt.
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual message_status propagate_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
Die propagate_message-Methode ermöglicht der priority_buffer-Klasse als Nachrichtenempfänger oder -ziel fungieren. Diese Methode empfängt die vom angegebenen Quellblock bereitgestellte Nachricht und fügt sie in die Prioritätswarteschlange ein. Anschließend werden alle Ausgabenachrichten von der propagate_message-Methode asynchron an die Zielblöcke gesendet.
Diese Methode wird von der Laufzeit aufgerufen, wenn Sie die Concurrency::asend-Funktion aufrufen oder wenn der Nachrichtenblock mit anderen Nachrichtenblocks verbunden wird.
Definieren Sie die send_message-Methode im protected-Abschnitt.
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual message_status send_message(message<_Source_type>* message, ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
Die send_message-Methode ähnelt der propagate_message-Methode. Im Unterschied zu dieser sendet sie die Ausgabemeldungen jedoch synchron.
Diese Methode wird von der Laufzeit im Rahmen eines synchronen Sendevorgangs aufgerufen, beispielsweise beim Aufrufen der Concurrency::send-Funktion.
Die priority_buffer-Klasse enthält Konstruktorüberladungen, die in vielen Nachrichtenblocktypen verwendet werden. Einige Konstruktorüberladungen akzeptieren Concurrency::Scheduler- oder Concurrency::ScheduleGroup-Objekte, mit denen der Nachrichtenblock von einem bestimmten Taskplaner verwaltet werden kann. Andere Konstruktorüberladungen übernehmen eine Filterfunktion. Filterfunktionen ermöglichen Nachrichtenblöcken das Annehmen oder Ablehnen von Nachrichten anhand der Nutzlast. Weitere Informationen zu Nachrichtenfiltern finden Sie unter Asynchrone Nachrichtenblöcke. Weitere Informationen zu Taskplanern finden Sie unter Taskplaner (Concurrency Runtime).
Da Nachrichten von der priority_buffer-Klasse zunächst nach der Priorität und anschließend nach der Empfangsreihenfolge sortiert werden, ist diese Klasse am hilfreichsten, wenn Nachrichten asynchron empfangen werden, beispielsweise wenn die Concurrency::asend-Funktion aufgerufen wird oder wenn der Nachrichtenblock mit anderen Nachrichtenblocks verbunden wird.
[Nach oben]
Vollständiges Beispiel
Im folgenden Beispiel wird die vollständige Definition der priority_buffer-Klasse veranschaulicht.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*>
{
typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*>
{
typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace Concurrency
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer :
public propagator_block<multi_link_registry<ITarget<Type>>,
multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return Concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual message_status propagate_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual message_status send_message(message<_Source_type>* message,
ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
{
message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual message<Type>* consume_message(runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
message<_Source_type>* input_message = NULL;
{
critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
message<_Target_type>* output_message =
new message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
message<_Target_type> * message = _output_messages.front();
message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
message<_Source_type>*,
std::vector<message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
critical_section _input_lock;
// Stores outgoing messages.
std::queue<message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Im folgenden Beispiel wird eine Reihe von asend- und Concurrency::receive-Vorgängen für ein priority_buffer-Objekt ausgeführt.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace Concurrency;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Dieses Beispiel erzeugt die folgende Beispielausgabe.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Die priority_buffer-Klasse ordnet die Nachrichten zunächst nach der Priorität und anschließend nach der Reihenfolge ihres Empfangs. In diesem Beispiel werden Nachrichten mit höherer numerischer Priorität am Anfang der Warteschlange eingefügt.
[Nach oben]
Kompilieren des Codes
Kopieren Sie den Beispielcode und fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie die Definition der priority_buffer-Klasse in die Datei priority_buffer.h und das Testprogramm in die Datei priority_buffer.cpp ein, und führen Sie dann den folgenden Befehl in einem Visual Studio 2010-Eingabeaufforderungsfenster aus.
cl.exe /EHsc priority_buffer.cpp