非同期エージェント ライブラリ

非同期エージェント ライブラリ (単にエージェント ライブラリとも呼ばれます) は、同時実行対応のアプリケーション開発の保全性を向上できるプログラミング モデルを提供します。 エージェント ライブラリは、アクターベースのプログラミング モデルと、粒度の粗いデータ フローおよびパイプライン処理タスクのためのインプロセス メッセージ パッシングを推進する C++ テンプレート ライブラリです。 エージェント ライブラリは、同時実行ランタイムのスケジューリング コンポーネントとリソース管理コンポーネントに基づいています。

プログラミング モデル

エージェント ライブラリは、制御フローではなくデータ フローに基づく非同期通信モデルを使用して分離コンポーネントを接続できるようにすることで、共有状態に代わる手段を提供します。 データ フローは、必要なすべてのデータを使用できるときに計算が行われるプログラミング モデルです。制御フローは、あらかじめ決められた順序で計算が行われるプログラミング モデルです。

データ フロー プログラミング モデルは、プログラムの独立したコンポーネントがメッセージの送信によって相互に通信する、メッセージ パッシングの概念に関連しています。

エージェント ライブラリは、非同期エージェント、非同期メッセージ ブロック、およびメッセージ パッシング関数の 3 つのコンポーネントで構成されています。 エージェントは状態を保持し、メッセージ ブロックとメッセージ パッシング関数を使用して相互の通信および外部コンポーネントとの通信を行います。 メッセージ パッシング関数は、エージェントが外部コンポーネントとの間でメッセージを送受信できるようにします。 非同期メッセージ ブロックは、メッセージを保持し、エージェントが同期的に通信を行うことができるようにします。

次の図は、2 つのエージェントがメッセージ ブロックとメッセージ パッシング関数を使用して通信する方法を示しています。 この図で、agent1 は、Concurrency::send 関数と Concurrency::unbounded_buffer オブジェクトを使用して agent2 にメッセージを送信します。 agent2 は、Concurrency::receive 関数を使用してメッセージを読み取ります。 agent2 は、同じメソッドを使用して agent1 にメッセージを送信します。 破線の矢印は、エージェント間のデータの流れを表しています。 実線の矢印は、エージェントと、それが読み書きを行う対象のメッセージ ブロックを結んでいます。

エージェント ライブラリのコンポーネント

この図を実装するコード例については、このトピックで後述します。

エージェント プログラミング モデルには、他の同時実行機構および同期機構 (イベントなど) に比べていくつかの利点があります。 利点の 1 つは、メッセージ パッシングを使用してオブジェクト間で状態の変更を送信することで共有リソースへのアクセスを分離でき、スケーラビリティを向上できるという点です。 メッセージ パッシングの利点は、同期を外部同期オブジェクトに結び付けるのではなく、データに結び付けるという点にあります。 これにより、コンポーネント間のデータ伝送が簡略化され、アプリケーションのプログラミング エラーを防ぐことができます。

エージェント ライブラリを使用する場合

エージェント ライブラリは、相互に非同期通信を行う必要がある複数の操作がある場合に使用します。 メッセージ ブロックとメッセージ パッシング関数を使用すると、ロックなどの同期機構を必要とせずに並行アプリケーションを作成できます。 これにより、アプリケーション ロジックに集中することができます。

エージェント プログラミング モデルは、データ パイプラインまたはネットワークを作成する場合によく使用されます。 データ パイプラインは一連のコンポーネントで、その各コンポーネントは、より大きな目標を達成するための特定のタスクを実行します。 データフロー パイプラインのすべてのコンポーネントは、他のコンポーネントからメッセージを受け取ったときに処理を実行します。 その処理の結果は、パイプラインまたはネットワーク内の別のコンポーネントに渡されます。 コンポーネントは、並列パターン ライブラリ (PPL) などの他のライブラリの、より粒度の細かい同時実行機能を使用できます。

前述した図を実装する例を次に示します。

// basic-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// This agent writes a string to its target and reads an integer
// from its source.
class agent1 : public agent 
{
public:
   explicit agent1(ISource<int>& source, ITarget<wstring>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Send the request.
      wstringstream ss;
      ss << L"agent1: sending request..." << endl;
      wcout << ss.str();

      send(_target, wstring(L"request"));

      // Read the response.
      int response = receive(_source);

      ss = wstringstream();
      ss << L"agent1: received '" << response << L"'." << endl;
      wcout << ss.str();

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<int>& _source;
   ITarget<wstring>& _target;
};

// This agent reads a string to its source and then writes an integer
// to its target.
class agent2 : public agent 
{
public:
   explicit agent2(ISource<wstring>& source, ITarget<int>& target)
      : _source(source)
      , _target(target)
   {
   }

protected:
   void run()
   {
      // Read the request.
      wstring request = receive(_source);

      wstringstream ss;
      ss << L"agent2: received '" << request << L"'." << endl;
      wcout << ss.str();

      // Send the response.
      ss = wstringstream();
      ss << L"agent2: sending response..." << endl;
      wcout << ss.str();

      send(_target, 42);

      // Move the agent to the finished state.
      done();
   }

private:   
   ISource<wstring>& _source;
   ITarget<int>& _target;
};

int wmain()
{
   // Step 1: Create two message buffers to serve as communication channels
   // between the agents.

   // The first agent writes messages to this buffer; the second
   // agents reads messages from this buffer.
   unbounded_buffer<wstring> buffer1;

   // The first agent reads messages from this buffer; the second
   // agents writes messages to this buffer.
   overwrite_buffer<int> buffer2;

   // Step 2: Create the agents.
   agent1 first_agent(buffer2, buffer1);
   agent2 second_agent(buffer1, buffer2);

   // Step 3: Start the agents. The runtime calls the run method on
   // each agent.
   first_agent.start();
   second_agent.start();

   // Step 4: Wait for both agents to finish.
   agent::wait(&first_agent);
   agent::wait(&second_agent);
}

この例を実行すると、次の出力が生成されます。

agent1: sending request...
agent2: received 'request'.
agent2: sending response...
agent1: received '42'.

この例で使用されている機能の詳細については、次のトピックを参照してください。

関連トピック