同時実行ランタイムでの例外処理

同時実行ランタイムは、C++ 例外処理を使用してさまざまなエラーを通知します。 そのエラーには、ランタイムの不適切な使用、リソースの取得の失敗などのランタイム エラー、タスクおよびタスク グループに提供した処理関数で生じるエラーなどがあります。 タスクまたはタスク グループによって例外がスローされると、ランタイムはその例外を保持し、タスクまたはタスク グループの完了を待機するコンテキストにマーシャリングします。 軽量タスクやエージェントなどのコンポーネントの例外は、ランタイムによって自動的には管理されません。 そのため、独自の例外処理機構を実装する必要があります。 このトピックでは、タスク、タスク グループ、軽量タスク、および非同期エージェントによってスローされた例外をランタイムが処理するしくみと、アプリケーションで例外に応答する方法を説明します。

主要なポイント

  • タスクまたはタスク グループによって例外がスローされると、ランタイムはその例外を保持し、タスクまたはタスク グループの完了を待機するコンテキストにマーシャリングします。

  • 可能であれば、回復できるエラーを処理するために、concurrency::task::get および concurrency::task::wait への各呼び出しを try/catch ブロックで囲みます。 タスクが例外をスローし、その例外がそのタスク、その継続の 1 つ、またはメイン アプリケーションによってキャッチされない場合、ランタイムはアプリケーションを終了します。

  • タスク ベースの継続は常に実行されます。継続元タスクが正常に完了したかどうか、例外をスローしたかどうか、または取り消されたかどうかは、関係ありません。 値ベースの継続は、継続元タスクがスローまたは取り消すと、実行されません。

  • タスク ベースの継続は常に実行されるため、継続チェーンの末尾にタスク ベースの継続を追加することを検討してください。 これにより、コードですべての例外が確認されることを保証できます。

  • concurrency::task::get を呼び出し、そのタスクが取り消されると、ランタイムは concurrency::task_canceled をスローします。

  • ランタイムは、軽量タスクと軽量エージェントの例外を管理しません。

目次

  • タスクおよび継続

  • タスク グループと並列アルゴリズム

  • ランタイムによってスローされる例外

  • 複数の例外

  • キャンセル

  • 軽量タスク

  • 非同期エージェント

タスクおよび継続

ここでは、concurrency::task オブジェクトおよびその継続によってスローされた例外をランタイムが処理するしくみについて説明します。 タスクおよび継続モデルの詳細については、「タスクの並列化 (同時実行ランタイム)」を参照してください。

task オブジェクトに渡す処理関数の本体で例外をスローすると、ランタイムはその例外を保存して、concurrency::task::get または concurrency::task::wait を呼び出すコンテキストにその例外をマーシャリングします。 ドキュメント「タスクの並列化 (同時実行ランタイム)」は、タスク ベースの継続と値ベースの継続について説明したものです。要約すると、値ベースの継続は T 型のパラメーターを受け取り、タスク ベースの継続は task<T> 型のパラメーターを受け取ります。 スローするタスクに 1 つ以上の値ベースの継続がある場合、それらの継続を実行するスケジュールは設定されません。 この動作を次の例に示します。

// eh-task.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]
    {
        throw exception();
    });

    // Create a continuation that prints its input value.
    auto continuation = t.then([]
    {
        // We do not expect this task to run because 
        // the antecedent task threw.
        wcout << L"In continuation task..." << endl;
    });

    // Wait for the continuation to finish and handle any  
    // error that occurs. 
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        continuation.wait();

        // Alternatively, call get() to produce the same result. 
        //continuation.get();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception.
*/

タスク ベースの継続では、継続元タスクによってスローされるすべての例外を処理することができます。 タスク ベースの継続は常に実行されます。そのタスクが正常に完了したかどうか、例外をスローしたかどうか、または取り消されたかどうかは、関係ありません。 タスクが例外をスローする場合、タスク ベースの継続を実行するようにスケジュールが設定されます。 次の例は、常にスローするタスクを示しています。 タスクには 2 つの継続があります。1 つが値ベースで他方がタスク ベースです。 タスク ベースの例外は、常に実行されるため、継続元タスクによってスローされる例外をキャッチすることができます。 task::get または task::wait が呼び出されると、タスクの例外が常にスローされます。そのため、例が両方の継続の完了を待機しているときに、例外が再度スローされます。

// eh-continuations.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{    
    wcout << L"Running a task..." << endl;
    // Create a task that throws.
    auto t = create_task([]() -> int
    {
        throw exception();
        return 42;
    });

    // 
    // Attach two continuations to the task. The first continuation is   
    // value-based; the second is task-based. 

    // Value-based continuation.
    auto c1 = t.then([](int n)
    {
        // We don't expect to get here because the antecedent  
        // task always throws.
        wcout << L"Received " << n << L'.' << endl;
    });

    // Task-based continuation.
    auto c2 = t.then([](task<int> previousTask)
    {
        // We do expect to get here because task-based continuations 
        // are scheduled even when the antecedent task throws. 
        try
        {
            wcout << L"Received " << previousTask.get() << L'.' << endl;
        }
        catch (const exception& e)
        {
            wcout << L"Caught exception from previous task." << endl;
        }
    });

    // Wait for the continuations to finish. 
    try
    {
        wcout << L"Waiting for tasks to finish..." << endl;
        (c1 && c2).wait();
    }
    catch (const exception& e)
    {
        wcout << L"Caught exception while waiting for all tasks to finish." << endl;
    }
}
/* Output:
    Running a task...
    Waiting for tasks to finish...
    Caught exception from previous task.
    Caught exception while waiting for all tasks to finish.
*/

処理できる例外をキャッチするために、タスク ベースの継続を使用することをお勧めします。 タスク ベースの継続は常に実行されるため、継続チェーンの末尾にタスク ベースの継続を追加することを検討してください。 これにより、コードですべての例外が確認されることを保証できます。 次の例は、基本的な値ベースの継続チェーンを示しています。 このチェーン スローの 3 番目のタスクおよび後続の値ベースの継続は、実行されません。 ただし、最後の継続は、タスク ベースであるため常に実行されます。 この最後の継続は、3 番目のタスクによってスローされる例外を処理します。

できるだけ具体性の高い例外をキャッチすることをお勧めします。 キャッチする具体的な例外がない場合は、この最後のタスク ベースの継続を省略できます。 この場合、すべての例外が未処理になり、アプリケーションが停止する可能性があります。

// eh-task-chain.cpp 
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    int n = 1;
    create_task([n]
    {
        wcout << L"In first task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](int n)
    {
        wcout << L"In second task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](int n)
    {
        wcout << L"In third task. n = ";
        wcout << n << endl;

        // This task throws. 
        throw exception();
        // Not reached. 
        return n * 2;

    }).then([](int n)
    {
        // This continuation is not run because the previous task throws.
        wcout << L"In fourth task. n = ";
        wcout << n << endl;

        return n * 2;

    }).then([](task<int> previousTask)
    {
        // This continuation is run because it is value-based. 
        try
        {
            // The call to task::get rethrows the exception.
            wcout << L"In final task. result = ";
            wcout << previousTask.get() << endl;
        }
        catch (const exception&)
        {
            wcout << L"<exception>" << endl;
        }
    }).wait();
}
/* Output:
    In first task. n = 1
    In second task. n = 2
    In third task. n = 4
    In final task. result = <exception>
*/

ヒント

concurrency::task_completion_event::set_exception メソッドを使用して、タスクの完了イベントと例外を関連付けることができます。ドキュメント「タスクの並列化 (同時実行ランタイム)」では、concurrency::task_completion_event クラスについて詳しく説明されています。

concurrency::task_canceled は、task に関連する重要なランタイム例外の型です。 task::get が呼び出され、そのタスクが取り消された場合、ランライムは task_canceled をスローします(逆に、task::waittask_status::canceled を返し、スローしません)。タスク ベースの継続から、この例外をキャッチして処理できます。または、task::get を呼び出すと、この例外をキャッチして処理できます。 タスクのキャンセルの詳細については、「PPL における取り消し処理」を参照してください。

注意

コードから task_canceled をスローしないでください。代わりに concurrency::cancel_current_task を呼び出してください。

タスクが例外をスローし、その例外がそのタスク、その継続の 1 つ、またはメイン アプリケーションによってキャッチされない場合、ランタイムはアプリケーションを終了します。 アプリケーションがクラッシュした場合、C++ の例外がスローされると中断するように Visual Studio を構成できます。 未処理の例外の場所を診断したら、タスク ベースの継続を使用してそれを処理します。

このドキュメントの「ランタイムによってスローされる例外」セクションでは、ランタイム例外の処理方法について詳細に説明されています。

[トップ]

タスク グループと並列アルゴリズム

ここでは、タスク グループによってスローされた例外をランタイムが処理するしくみについて説明します。 このセクションの内容は、concurrency::parallel_for をはじめとする並列アルゴリズムにも当てはまります。これらのアルゴリズムはタスク グループに基づいて作成されているためです。

注意

例外が依存タスクに及ぼす影響を十分に理解しておいてください。タスクまたは並列アルゴリズムによる例外処理を使用する場合のベスト プラクティスについては、「並列パターン ライブラリに関するベスト プラクティス」の「取り消し処理および例外処理がオブジェクトの破棄に及ぼす影響について」を参照してください。

タスク グループの詳細については、「タスクの並列化 (同時実行ランタイム)」を参照してください。 並列アルゴリズムの詳細については、「並列アルゴリズム」を参照してください。

concurrency::task_group オブジェクトまたは concurrency::structured_task_group オブジェクトに渡す処理関数の本体で例外をスローすると、ランタイムはその例外を保存し、concurrency::task_group::waitconcurrency::structured_task_group::waitconcurrency::task_group::run_and_wait、または concurrency::structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。 また、ランタイムは、タスク グループ内のすべてのアクティブ タスク (子タスク グループ内のタスクも含む) を中止すると共に、開始されていないすべてのタスクを破棄します。

次の例は、例外をスローする処理関数の基本的な構造を示しています。 この例では、task_group オブジェクトを使用して 2 つの point オブジェクトの値を並列的に出力します。 print_point 処理関数は point オブジェクトの値をコンソールに出力します。 入力値が NULL の場合、処理関数は例外をスローします。 ランタイムはこの例外を保存し、task_group::wait を呼び出すコンテキストにその例外をマーシャリングします。

// eh-task-group.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// Defines a basic point with X and Y coordinates. 
struct point
{
   int X;
   int Y;
};

// Prints the provided point object to the console. 
void print_point(point* pt)
{
   // Throw an exception if the value is NULL. 
   if (pt == NULL)
   {
      throw exception("point is NULL.");
   }

   // Otherwise, print the values of the point.
   wstringstream ss;
   ss << L"X = " << pt->X << L", Y = " << pt->Y << endl;
   wcout << ss.str();
}

int wmain()
{
   // Create a few point objects.
   point pt = {15, 30};
   point* pt1 = &pt;
   point* pt2 = NULL;

   // Use a task group to print the values of the points.
   task_group tasks;

   tasks.run([&] {
      print_point(pt1);
   });

   tasks.run([&] {
      print_point(pt2);
   });

   // Wait for the tasks to finish. If any task throws an exception, 
   // the runtime marshals it to the call to wait. 
   try
   {
      tasks.wait();
   }
   catch (const exception& e)
   {
      wcerr << L"Caught exception: " << e.what() << endl;
   }
}

この例を実行すると、次の出力が生成されます。

  

タスク グループの例外処理を使用した詳細な例については、「方法: 例外処理を使用して並列ループを中断する」を参照してください。

[トップ]

ランタイムによってスローされる例外

ランタイムの呼び出しから例外が発生する可能性があります。 concurrency::task_canceledconcurrency::operation_timed_out を除き、例外のほとんどの型は、プログラミング エラーを示します。 一般にこれらのエラーは回復不能であるため、アプリケーション コードではキャッチまたは処理できません。 プログラミング エラーを診断する必要がある場合にのみ、回復不能なエラーをアプリケーション コードでキャッチまたは処理することをお勧めします。 ただし、ランタイムで定義されている例外の種類を把握しておけば、プログラミング エラーを診断するときに役立ちます。

ランタイムによってスローされる例外の例外処理機構は、処理関数によってスローされる例外の例外処理機構と同じです。 たとえば、concurrency::receive 関数は、指定の時間内にメッセージを受信しなかった場合、operation_timed_out をスローします。 タスク グループに渡す処理関数で receive が例外をスローすると、ランタイムはその例外を保存して、task_group::waitstructured_task_group::waittask_group::run_and_wait、または structured_task_group::run_and_wait を呼び出すコンテキストにその例外をマーシャリングします。

次の例では、concurrency::parallel_invoke アルゴリズムを使用して、2 つのタスクを並列に実行します。 1 つ目のタスクは 5 秒間待機した後、メッセージをメッセージ バッファーに送信します。 2 つ目のタスクは receive 関数を使用して 3 秒間待機し、同じメッセージ バッファーからメッセージを受信します。 receive 関数は、時間内にメッセージを受信しなかった場合、operation_timed_out をスローします。

// eh-time-out.cpp 
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   single_assignment<int> buffer;
   int result;

   try
   {
      // Run two tasks in parallel.
      parallel_invoke(
         // This task waits 5 seconds and then sends a message to  
         // the message buffer.
         [&] {
            wait(5000); 
            send(buffer, 42);
         },
         // This task waits 3 seconds to receive a message. 
         // The receive function throws operation_timed_out if it does  
         // not receive a message in the specified time period.
         [&] {
            result = receive(buffer, 3000);
         }
      );

      // Print the result.
      wcout << L"The result is " << result << endl;
   }
   catch (operation_timed_out&)
   {
      wcout << L"The operation timed out." << endl;
   }
}

この例を実行すると、次の出力が生成されます。

  

アプリケーションの異常終了を防ぐために、コードでランタイムを呼び出す場合は例外が処理されるようにしてください。 また、サードパーティのライブラリなど、同時実行ランタイムを使用する外部コードを呼び出す場合にも、例外を処理する必要があります。

[トップ]

複数の例外

タスクまたは並列アルゴリズムが複数の例外を受け取った場合、ランタイムはそのいずれか 1 つだけを呼び出し元のコンテキストにマーシャリングします。 どの例外がマーシャリングされるかは、任意です。

次の例では、parallel_for アルゴリズムを使用して、数値をコンソールに出力します。 入力値が指定の最小値未満であるか、最大値を超えている場合、例外がスローされます。 この例では、複数の処理関数が例外をスローする可能性があります。

// eh-multiple.cpp 
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

int wmain()
{
   const int min = 0;
   const int max = 10;

   // Print values in a parallel_for loop. Use a try-catch block to  
   // handle any exceptions that occur in the loop. 
   try
   {
      parallel_for(-5, 20, [min,max](int i)
      {
         // Throw an exeception if the input value is less than the  
         // minimum or greater than the maximum. 

         // Otherwise, print the value to the console. 

         if (i < min)
         {
            stringstream ss;
            ss << i << ": the value is less than the minimum.";
            throw exception(ss.str().c_str());
         }
         else if (i > max)
         {
            stringstream ss;
            ss << i << ": the value is greater than than the maximum.";
            throw exception(ss.str().c_str());
         }
         else
         {
            wstringstream ss;
            ss << i << endl;
            wcout << ss.str();
         }
      });
   }
   catch (exception& e)
   {
      // Print the error to the console.
      wcerr << L"Caught exception: " << e.what() << endl;
   }  
}

この例のサンプル出力を次に示します。

  

[トップ]

キャンセル

すべての例外がエラーの存在を示すわけではありません。 たとえば、検索アルゴリズムは結果を検出したときに、例外処理を使用して、関連付けられているタスクを中止することがあります。 取り消しの機構の使用方法の詳細については、「PPL における取り消し処理」を参照してください。

[トップ]

軽量タスク

軽量タスクは、concurrency::Scheduler オブジェクトから直接スケジュールするタスクです。 軽量タスクは、通常のタスクよりもオーバーヘッドが小さくなります。 ただし、ランタイムは軽量タスクによってスローされた例外をキャッチしません。 代わりに、ハンドルされない例外のハンドラーが例外をキャッチし、既定ではプロセスを終了します。 したがって、アプリケーションで適切なエラー処理機構を使用する必要があります。 軽量タスクの詳細については、「タスク スケジューラ (同時実行ランタイム)」を参照してください。

[トップ]

非同期エージェント

軽量タスクと同様に、ランタイムは非同期エージェントによってスローされた例外を管理しません。

concurrency::agent から派生するクラスで例外を処理する方法の一例を次に示します。 この例では、points_agent クラスを定義しています。 points_agent::run メソッドはメッセージ バッファーから point オブジェクトを読み取り、それをコンソールに出力します。 run メソッドは、NULL ポインターを受け取った場合に例外をスローします。

run メソッドでは、すべての処理が try-catch ブロックに内包されています。 catch ブロックは、例外をメッセージ バッファーに格納します。 アプリケーションは、エージェントの終了後にこのバッファーから例外を読み取ることで、エージェントでのエラーの有無をチェックします。

// eh-agents.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Defines a point with x and y coordinates. 
struct point
{
   int X;
   int Y;
};

// Informs the agent to end processing.
point sentinel = {0,0};

// An agent that prints point objects to the console. 
class point_agent : public agent
{
public:
   explicit point_agent(unbounded_buffer<point*>& points)
      : _points(points)
   { 
   }

   // Retrieves any exception that occurred in the agent. 
   bool get_error(exception& e)
   {
      return try_receive(_error, e);
   }

protected:
   // Performs the work of the agent. 
   void run()
   {
      // Perform processing in a try block. 
      try
      {
         // Read from the buffer until we reach the sentinel value. 
         while (true)
         {
            // Read a value from the message buffer.
            point* r = receive(_points);

            // In this example, it is an error to receive a  
            // NULL point pointer. In this case, throw an exception. 
            if (r == NULL)
            {
               throw exception("point must not be NULL");
            }
            // Break from the loop if we receive the  
            // sentinel value. 
            else if (r == &sentinel)
            {
               break;
            }
            // Otherwise, do something with the point. 
            else
            {
               // Print the point to the console.
               wcout << L"X: " << r->X << L" Y: " << r->Y << endl;
            }
         }
      }
      // Store the error in the message buffer. 
      catch (exception& e)
      {
         send(_error, e);
      }

      // Set the agent status to done.
      done();
   }

private:
   // A message buffer that receives point objects.
   unbounded_buffer<point*>& _points;

   // A message buffer that stores error information.
   single_assignment<exception> _error;
};

int wmain()
{  
   // Create a message buffer so that we can communicate with 
   // the agent.
   unbounded_buffer<point*> buffer;

   // Create and start a point_agent object.
   point_agent a(buffer);
   a.start();

   // Send several points to the agent.
   point r1 = {10, 20};
   point r2 = {20, 30};
   point r3 = {30, 40};

   send(buffer, &r1);
   send(buffer, &r2);
   // To illustrate exception handling, send the NULL pointer to the agent.
   send(buffer, reinterpret_cast<point*>(NULL));
   send(buffer, &r3);
   send(buffer, &sentinel);

   // Wait for the agent to finish.
   agent::wait(&a);

   // Check whether the agent encountered an error.
   exception e;
   if (a.get_error(e))
   {
      cout << "error occurred in agent: " << e.what() << endl;
   }

   // Print out agent status.
   wcout << L"the status of the agent is: ";
   switch (a.status())
   {
   case agent_created:
      wcout << L"created";
      break;
   case agent_runnable:
      wcout << L"runnable";
      break;
   case agent_started:
      wcout << L"started";
      break;
   case agent_done:
      wcout << L"done";
      break;
   case agent_canceled:
      wcout << L"canceled";
      break;
   default:
      wcout << L"unknown";
      break;
   }
   wcout << endl;
}

この例を実行すると、次の出力が生成されます。

  

try-catch ブロックは while ループの外側にあるため、エージェントは最初のエラーを検出した時点で処理を終了します。 try-catch ブロックが while ループの内側にある場合は、エージェントはエラーの発生後も処理を継続できます。

この例では例外がメッセージ バッファーに格納されるため、別のコンポーネントが実行中のエージェントのエラーを監視できます。 この例では、エラーを保存するのに concurrency::single_assignment オブジェクトを使用します。 エージェントが複数の例外を処理する場合、single_assignment クラスは渡された最初のメッセージだけを保存します。 最後の例外だけを保存するには、concurrency::overwrite_buffer クラスを使用します。 例外をすべて保存するには、concurrency::unbounded_buffer クラスを使用します。 これらのメッセージ ブロックの詳細については、「非同期メッセージ ブロック」を参照してください。

非同期エージェントの詳細については、「非同期エージェント」を参照してください。

[トップ]

要約

[トップ]

参照

概念

同時実行ランタイム

タスクの並列化 (同時実行ランタイム)

並列アルゴリズム

PPL における取り消し処理

タスク スケジューラ (同時実行ランタイム)

非同期エージェント