逐步解說:使用聯結以避免死結

本主題會使用餐廳的用餐哲學家問題,說明如何使用 concurrency::join 類別,以防止您的應用程式中造成死結。在軟體應用程式中,如果有兩個以上的處理序各擁有一項資源,而互相等候另一個處理序釋放另一項資源,即會發生「死結」(Deadlock)。

我們常用哲學家用餐問題做為特定範例,說明多個並行處理序之間共用一組資源時可能會發生的一般問題集。

必要條件

在您開始閱讀此逐步解說前,請先參閱下列主題:

章節

此逐步解說包含下列章節:

  • 哲學家用餐問題

  • 直覺實作

  • 使用聯結以避免死結

哲學家用餐問題

哲學家用餐問題說明了應用程式中為何會發生死結。在此問題中,有五位哲學家圍坐在一個圓桌。每位哲學家各交替著思考與用餐的動作。每位哲學家都必須與左邊的哲學家共用一根筷子,與右邊的哲學家也必須共用一根筷子。下圖顯示其座位配置。

哲學家用餐問題

哲學家必須同時擁有兩根筷子,才能用餐。如果每個哲學家各有一根筷子,而都在等候另一根,則沒有哲學家可以用餐,大家都得挨餓。

Top

直覺實作

下列範例說明哲學家用餐問題的直覺實作。philosopher類別,這是衍生自 concurrency::agent,可讓每個獨立做哲學家。此範例會使用共用的陣列的 concurrency::critical_section 物件,取得每個philosopher chopsticks 一對獨占存取的物件。

我們以 philosopher 類別代表一個哲學家,說明實作與圖例的關聯性。int 變數代表每根筷子。critical_section 物件做為筷子的持有者。run 方法模擬哲學家的生命。think 方法模擬思考的動作,eat 方法則模擬用餐的動作。

philosopher 在呼叫 eat 方法前,會先同時鎖定前述兩個 critical_section 物件,以模擬從持有者手中拿走筷子的動作。在呼叫 eat 之後,philosopher 物件會將 critical_section 物件重新設為未鎖定的狀態,將筷子歸還給持有者。

pickup_chopsticks 方法說明可能會發生死結的情況。如果每個 philosopher 物件皆取得其中一個鎖定的存取權,則沒有任何 philosopher 物件可繼續動作,因為另一個鎖定皆由其他 philosopher 物件所控制。

範例

Dd742294.collapse_all(zh-tw,VS.110).gif程式碼

// philosophers-deadlock.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// A shared array of critical sections. Each critical section 
// guards access to a single chopstick.
critical_section locks[philosopher_count];

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(chopstick& left, chopstick& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         pickup_chopsticks(); 
         eat();
         send(_times_eaten, n+1);
         putdown_chopsticks();
      }

      done();
   }

   // Gains access to the chopsticks.
   void pickup_chopsticks()
   {
      // Deadlock occurs here if each philosopher gains access to one
      // of the chopsticks and mutually waits for another to release
      // the other chopstick.
      locks[_left].lock();
      locks[_right].lock();
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks()
   {
      locks[_right].unlock();
      locks[_left].unlock();
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   { 
      random_wait(100);
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      concurrency::wait(_random_generator()%max);
   }

private:
   // Index of the left chopstick in the chopstick array.
   chopstick& _left;
   // Index of the right chopstick in the chopstick array.
   chopstick& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of index values for the chopsticks.
   array<chopstick, philosopher_count> chopsticks = {0, 1, 2, 3, 4};

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

編譯程式碼

將範例程式碼複製並貼上它在 Visual Studio 專案中,或將它貼在檔名為哲學家 deadlock.cpp ,然後執行下列命令,Visual Studio 的命令提示字元] 視窗中。

cl.exe /EHsc philosophers-deadlock.cpp

Top

使用聯結以避免死結

本節說明如何使用訊息緩衝區和訊息傳遞函式消除死結的可能性。

要產生關聯與先前,這個範例philosopher類別會取代每個critical_section物件藉由使用 concurrency::unbounded_buffer 物件和join物件。join 物件做為將筷子提供給哲學家的仲裁者。

此範例會使用 unbounded_buffer 類別,因為當目標接收到來自 unbounded_buffer 物件的訊息時,該訊息會從訊息佇列中移除。這可讓保有訊息的 unbounded_buffer 物件指出筷子處於可用狀態。未獲得訊息的 unbounded_buffer 物件則會指出有人正在使用筷子。

此範例會使用非窮盡 (non-greedy) join 物件,因為非窮盡聯結只有在兩個 unbounded_buffer 物件都包含訊息時,才會給予每個 philosopher 物件兩根筷子的存取權。窮盡 (greedy) 聯結不會避免死結,因為只要一有訊息,窮盡聯結就會立即接受。如果所有窮盡 join 物件都接收其中一個訊息,但無止盡地等待另一個訊息變成可用訊息,就會發生死結。

如需窮盡和非窮盡聯結的詳細資訊,以及各種訊息緩衝區類型之間的差異資訊,請參閱非同步訊息區

若要在此範例中避免死結

  1. 從範例中移除下列程式碼。

    // A shared array of critical sections. Each critical section 
    // guards access to a single chopstick.
    critical_section locks[philosopher_count];
    
  2. philosopher 類別之 _left_right 資料成員的類型變更為 unbounded_buffer

    // Message buffer for the left chopstick.
    unbounded_buffer<chopstick>& _left;
    // Message buffer for the right chopstick.
    unbounded_buffer<chopstick>& _right;
    
  3. 修改 philosopher 建構函式,以 unbounded_buffer 物件做為其參數。

    explicit philosopher(unbounded_buffer<chopstick>& left, 
       unbounded_buffer<chopstick>& right, const wstring& name)
       : _left(left)
       , _right(right)
       , _name(name)
       , _random_generator(42)
    {
       send(_times_eaten, 0);
    }
    
  4. 修改 pickup_chopsticks 方法,以使用非窮盡 join 物件接收來自一雙筷子之訊息緩衝區的訊息。

    // Gains access to the chopsticks.
    vector<int> pickup_chopsticks()
    {
       // Create a non-greedy join object and link it to the left and right 
       // chopstick.
       join<chopstick, non_greedy> j(2);
       _left.link_target(&j);
       _right.link_target(&j);
    
       // Receive from the join object. This resolves the deadlock situation
       // because a non-greedy join removes the messages only when a message
       // is available from each of its sources.
       return receive(&j);
    }
    
  5. 修改 putdown_chopsticks 方法,將訊息傳送至一雙筷子的訊息緩衝區,以釋放筷子的存取權。

    // Releases the chopsticks for others.
    void putdown_chopsticks(int left, int right)
    {
       // Add the values of the messages back to the message queue.
       asend(&_left, left);
       asend(&_right, right);
    }
    
  6. 修改 run 方法以保有 pickup_chopsticks 方法的結果,並將這些結果傳遞至 putdown_chopsticks 方法。

    // Performs the main logic of the dining philosopher algorithm.
    void run()
    {
       // Repeat the thinks/eat cycle a set number of times.
       for (int n = 0; n < eat_count; ++n)
       {
          think();
    
          vector<int> v = pickup_chopsticks(); 
    
          eat();
    
          send(_times_eaten, n+1);
    
          putdown_chopsticks(v[0], v[1]);
       }
    
       done();
    }
    
  7. wmain 函式中的 chopsticks 變數宣告修改成各包含一則訊息之 unbounded_buffer 物件的陣列。

    // Create an array of message buffers to hold the chopsticks.
    array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;
    
    // Send a value to each message buffer in the array.
    // The value of the message is not important. A buffer that contains
    // any message indicates that the chopstick is available.
    for_each (begin(chopsticks), end(chopsticks), 
       [](unbounded_buffer<chopstick>& c) {
          send(c, 1);
    });
    

範例

Dd742294.collapse_all(zh-tw,VS.110).gif描述

以下顯示使用非窮盡 join 物件消除死結風險的完整範例。

Dd742294.collapse_all(zh-tw,VS.110).gif程式碼

// philosophers-join.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(unbounded_buffer<chopstick>& left, 
      unbounded_buffer<chopstick>& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         vector<int> v = pickup_chopsticks(); 

         eat();

         send(_times_eaten, n+1);

         putdown_chopsticks(v[0], v[1]);
      }

      done();
   }

   // Gains access to the chopsticks.
   vector<int> pickup_chopsticks()
   {
      // Create a non-greedy join object and link it to the left and right 
      // chopstick.
      join<chopstick, non_greedy> j(2);
      _left.link_target(&j);
      _right.link_target(&j);

      // Receive from the join object. This resolves the deadlock situation
      // because a non-greedy join removes the messages only when a message
      // is available from each of its sources.
      return receive(&j);
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks(int left, int right)
   {
      // Add the values of the messages back to the message queue.
      asend(&_left, left);
      asend(&_right, right);
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   {      
      random_wait(100);      
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      concurrency::wait(_random_generator()%max);
   }

private:
   // Message buffer for the left chopstick.
   unbounded_buffer<chopstick>& _left;
   // Message buffer for the right chopstick.
   unbounded_buffer<chopstick>& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of message buffers to hold the chopsticks.
   array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;

   // Send a value to each message buffer in the array.
   // The value of the message is not important. A buffer that contains
   // any message indicates that the chopstick is available.
   for_each (begin(chopsticks), end(chopsticks), 
      [](unbounded_buffer<chopstick>& c) {
         send(c, 1);
   });

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

Dd742294.collapse_all(zh-tw,VS.110).gif註解

這個範例產生下列輸出。

aristotle ate 50 times.
descartes ate 50 times.
hobbes ate 50 times.
socrates ate 50 times.
plato ate 50 times.

編譯程式碼

將範例程式碼複製並貼上它在 Visual Studio 專案中,或將它貼在檔名為哲學家 join.cpp ,然後執行下列命令,Visual Studio 的命令提示字元] 視窗中。

cl.exe /EHsc philosophers-join.cpp

Top

請參閱

概念

非同步代理程式程式庫

非同步代理程式

非同步訊息區

訊息傳遞函式

同步處理資料結構

其他資源

並行執行階段逐步解說