Empfohlene Vorgehensweisen in der Asynchronous Agents Library
In diesem Dokument wird die effektive Verwendung der Asynchronous Agents Library beschrieben. Die Agents Library begünstigt ein akteurbasiertes Programmiermodell und die prozessinterne Nachrichtenübergabe für simple Datenfluss- und Pipelinesaufgaben.
Weitere Informationen zur Agents-Bibliothek finden Sie unter "Asynchrone Agents"-Bibliothek.
Abschnitte
Dieses Dokument enthält folgende Abschnitte:
Verwenden von Agents zum Isolieren des Zustands
Die Agents Library stellt Alternativen zum Freigabezustand bereit, indem sie das Verbinden isolierter Komponenten durch einen asynchronen Nachrichtenübergabemechanismum ermöglicht. Asynchrone Agents sind am effektivsten, wenn der interne Zustand von anderen Komponenten isoliert werden kann. Aufgrund der Isolierung des Status wirken mehrere Komponenten i. d. R. nicht auf freigegebene Daten. Die Isolierung des Status ermöglicht eine Skalierung der Anwendung, da Konflikte im freigegebenen Speicher reduziert werden. Ferner wird dadurch die Wahrscheinlichkeit von Deadlocks und Racebedingungen verringert, da der Zugriff auf die freigegebenen Daten nicht zwischen den Komponenten synchronisiert werden muss.
Status in einem Agent werden i. d. R. isoliert, indem Datenmember im Abschnitt private
oder im Abschnitt protected
der Agent-Klasse platziert und Statusänderungen mit Nachrichtenpuffern kommuniziert werden. Das folgende Beispiel zeigt die basic_agent
Klasse, die von concurrency::agent abgeleitet wird. Die basic_agent
-Klasse verwendet zwei Meldungspuffer zur Kommunikation mit externen Komponenten. Ein Nachrichtenpuffer enthält eingehende Meldungen, der andere Nachrichtenpuffer enthält ausgehende Nachrichten.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrieves the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
Vollständige Beispiele zum Definieren und Verwenden von Agents finden Sie unter Walkthrough: Creating an Agent-Based Application and Walkthrough: Creating a Dataflow Agent.
Verwenden eines Einschränkungsmechanismus zum Einschränken der Anzahl von Nachrichten in einer Datenpipeline
Viele Nachrichtenpuffertypen, z . B. Parallelität::unbounded_buffer, können eine unbegrenzte Anzahl von Nachrichten enthalten. Wenn Nachrichten vom Nachrichtenproducer schneller an eine Datenpipeline gesendet werden, als diese vom Consumer verarbeitet werden können, kann der Speicherstatus der Anwendung auf Speichermangel hinweisen. Mit einem Einschränkungsmechanismus wie einem Semaphor können Sie die Anzahl der Nachrichten begrenzen, die zu einem gegebenen Zeitpunkt gleichzeitg in einer Datenpipepline aktiv sind.
Im Folgenden einfachen Beispiel wird veranschaulicht, wie die Anzahl der Nachrichten in einer Datenpipeline mit einem Semaphor eingeschränkt wird. Die Datenpipeline verwendet die Parallelität::Wait-Funktion , um einen Vorgang zu simulieren, der mindestens 100 Millisekunden benötigt. Da die Nachrichten vom Absender schneller erzeugt werden, als diese vom Consumer verarbeitet werden können, wird in diesem Beispiel die semaphore
-Klasse definiert, sodass die Anzahl der aktiven Nachrichten von der Anwendung eingeschränkt werden kann.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
Die Pipeline wird vom semaphore
-Objekt eingeschränkt, sodass maximal zwei Nachrichten gleichzeitig verarbeitet werden können.
Der Producer in diesem Beispiel sendet verhältnismäßig wenig Nachrichten an den Consumer. Daher ist dieses Beispiel ungeeignet, um eine Situation mit einem Mangel an Arbeitsspeicher zu veranschaulichen. Dieser Mechanismus ist jedoch hilfreich, wenn eine Datenpipeline eine relativ hohe Anzahl von Meldungen enthält.
Weitere Informationen zum Erstellen der in diesem Beispiel verwendeten Semaphorklasse finden Sie unter How to: Use the Context Class to Implement a Cooperative Semaphor.
Keine feinkörnige Arbeit in einer Datenpipeline ausführen
Die Agents Library ist besonders hilfreich, wenn die Arbeit, die von einer Datenpipeline ausgeführt wird, eher simpel ist. Beispielsweise könnte eine Anwendungskomponente Daten aus einer Datei oder einer Netzwerkverbindung lesen und ab und zu Daten an eine andere Komponente senden. Das Protokoll, das von der Agents-Bibliothek zum Verteilen von Nachrichten verwendet wird, bewirkt, dass der Nachrichtenübergabemechanismus mehr Aufwand hat als die parallelen Konstrukte der Aufgabe, die von der Parallel Patterns Library (PPL) bereitgestellt werden. Deshalb müssen Sie sicherstellen, dass die Arbeit, die von einer Datenpipeline ausgeführt wird, lang genug dauert, um diesen Mehraufwand auszugleichen.
Obwohl eine Datenpipeline am effektivsten ist, wenn die Aufgaben simpel sind, können in jeder Phase der Datenpipeline PPL-Konstrukte wie Aufgabengruppen und parallele Algorithmen verwendet werden, um differenziertere Aufgaben auszuführen. Ein Beispiel für ein grobkörniges Datennetzwerk, das in jeder Verarbeitungsphase feinkörnige Parallelität verwendet, finden Sie unter Walkthrough: Creating an Image Processing Network.
Große Nachrichtennutzlasten nicht nach Wert übergeben
In einigen Fällen wird von der Laufzeit eine Kopie einer Nachricht erstellt und von einem Nachrichtenpuffer an einen anderen Nachrichtenpuffer übergeben. Die Klasse "concurrency::overwrite_buffer" bietet beispielsweise eine Kopie jeder Nachricht, die sie an jedes seiner Ziele empfängt. Die Laufzeit erstellt auch eine Kopie der Nachrichtendaten, wenn Sie Nachrichtenübergabefunktionen verwenden, z . B. Parallelität::Send und Parallelität::Empfangen , um Nachrichten in einen Nachrichtenpuffer zu schreiben und nachrichten aus einem Nachrichtenpuffer zu lesen. Auch wenn dieser Mechanismus dazu beiträgt, das Risiko zu verringern, dass freigegebene Daten gleichzeitig geschrieben werden, kann die Arbeitsspeicherleistung darunter leiden, dass die Nachrichten relativ hoch ist.
Sie können die Arbeitsspeicherleistung beim Übergeben von Nachrichten mit einer großen Nutzlast verbessern, indem Sie Zeiger und Verweise verwenden. Im folgenden Beispiel wird die Übergabe umfangreicher Nachrichten anhand des Werts mit der Übergabe von Zeigern auf den gleichen Nachrichtentyp verglichen. Im Beispiel werden zwei Agent-Typen definiert, producer
und consumer
, die auf message_data
-Objekte angewendet werden. In dem Beispiel wird die Zeit, die der Producer benötigt, um mehrere message_data
-Objekte an den Consumer zu senden, mit der Zeit verglichen, die der Producer-Agent benötigt, um mehrere Zeiger auf message_data
-Objekte an den Consumer zu senden.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Dieses Beispiel erzeugt die folgende Beispielausgabe:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
Die Version mit Zeiger weist eine bessere Leistungauf, da von der Laufzeit während der Übergabe vom Producer an den Consumer nicht eine vollständige Kopie jedes message_data
-Objekts erstellt werden muss.
Verwenden von shared_ptr in einem Datennetzwerk, wenn der Besitz nicht definiert ist
Beim Senden von Nachrichten anhand des Zeigers über eine Pipeline oder ein Netzwerk zur Übergabe von Nachrichten wird der Arbeitsspeicher für jede Nachricht i. d. R. am Anfang des Netzwerks zugewiesen und am Ende des Netzwerks freigegeben. Obwohl dieser Mechanismus normalerweise gut funktioniert, kann er jedoch in einigen Fällen kaum oder nicht verwendet werden. Betrachten Sie beispielsweise den Fall eines Datennetzwerkes mit mehreren Endknoten. In diesem Fall gibt es keine klare Position, um den Arbeitsspeicher für die Nachrichten freizugeben.
Um dieses Problem zu lösen, können Sie einen Mechanismus verwenden, z. B. "std::shared_ptr", mit dem ein Zeiger mehrere Komponenten besitzen kann. Wenn das endgültige shared_ptr
-Objekt zerstört wird, das eine Ressource besitzt, wird auch die Ressource freigegeben.
Im folgenden Beispiel wird veranschaulicht, wie mit shared_ptr
Zeigerwerte für mehrere Nachrichtenpuffer gemeinsam verwendet werden können. Im Beispiel wird ein Parallelitätsobjekt::overwrite_buffer-Objekt mit drei Parallelitätsobjekten::call-Objekten verbunden. Die overwrite_buffer
-Klasse stellt Nachrichten für die einzelnen Ziele bereit. Da das Datennetzwerk am Ende mehrere Besitzer der Daten aufweist, werden die einzelnen shared_ptr
-Objekte mit call
in die Lage versetzt, den Besitz der Nachrichten zu teilen.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Dieses Beispiel erzeugt die folgende Beispielausgabe:
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
Siehe auch
Bewährte Methoden im Zusammenhang mit der Concurrency Runtime
Asynchrone Agents Library
Exemplarische Vorgehensweise: Erstellen einer agentbasierten Anwendung
Exemplarische Vorgehensweise: Erstellen eines Datenfluss-Agent
Exemplarische Vorgehensweise: Erstellen eines Bildverarbeitungsnetzwerks
Bewährte Methoden in der Parallel Patterns Library
Allgemein bewährte Methoden in der Concurrency Runtime