Gewusst wie: Implementieren einer kooperativen Semaphore mithilfe der Context-Klasse

In diesem Thema wird gezeigt, wie Sie die Parallelitätsklasse::Context verwenden, um eine kooperative Semaphorklasse zu implementieren.

Hinweise

Mit der Context-Klasse können Sie den aktuellen Ausführungskontext blockieren oder zurückhalten. Das Blockieren oder Zurückhalten des aktuellen Kontexts ist nützlich, wenn der aktuelle Kontext nicht fortfahren kann, da eine Ressource nicht verfügbar ist. Ein Semaphor ist ein Beispiel für eine Situation, in der der aktuelle Ausführungskontext warten muss, bis eine Ressource verfügbar ist. Eine Semaphore ist wie ein kritisches Abschnittsobjekt ein Synchronisierungsobjekt, das dem Code in einem Kontext ermöglicht, exklusiv auf eine Ressource zuzugreifen. Im Gegensatz zu einem kritischen Abschnittsobjekt ermöglicht eine Semaphore jedoch mehr als einem Kontext, gleichzeitig auf die Ressource zuzugreifen. Wenn die maximale Anzahl von Kontexten eine Semaphorensperre hat, muss jeder zusätzliche Kontext warten, bis ein anderer Kontext die Sperre aufhebt.

So implementieren Sie die Semaphorenklasse

  1. Deklarieren Sie eine Klasse mit dem Namen semaphore. Fügen Sie der Klasse einen public-Abschnitt und einen private-Abschnitt hinzu.
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
  1. Deklarieren Sie im private Abschnitt der semaphore Klasse eine std::atomic-Variable , die die Anzahl der Semaphore und ein Parallelitätsobjekt::concurrent_queue enthält, das die Kontexte enthält, die warten müssen, um das Semaphor zu erhalten.
// The semaphore count.
atomic<long long> _semaphore_count;

// A concurrency-safe queue of contexts that must wait to 
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
  1. Implementieren Sie im public-Abschnitt der semaphore-Klasse den Konstruktor. Der Konstruktor akzeptiert einen long long-Wert, der die maximale Anzahl von Kontexten angibt, die gleichzeitig über die Sperre verfügen können.
explicit semaphore(long long capacity)
   : _semaphore_count(capacity)
{
}
  1. Implementieren Sie im public-Abschnitt der semaphore-Klasse die acquire-Methode. Diese Methode dekrementiert die Semaphorenanzahl als atomaren Vorgang. Wenn die Anzahl der Semaphore negativ wird, fügen Sie den aktuellen Kontext am Ende der Warteschleife hinzu, und rufen Sie die Parallelität::Context::Block-Methode auf, um den aktuellen Kontext zu blockieren.
// Acquires access to the semaphore.
void acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (--_semaphore_count < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}
  1. Implementieren Sie im public-Abschnitt der semaphore-Klasse die release-Methode. Diese Methode inkrementiert die Semaphorenanzahl als atomaren Vorgang. Wenn die Semaphorenanzahl vor dem Inkrementieren negativ ist, gibt es mindestens einen Kontext, der auf die Sperre wartet. Entsperren Sie in diesem Fall den Kontext, der sich am Anfang der Warteschlange befindet.
// Releases access to the semaphore.
void release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (++_semaphore_count <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      while (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

Beispiel

Die semaphore-Klasse in diesem Beispiel verhält sich kooperativ, da die Context::Block-Methode und Context::Yield-Methode die Ausführung zurückhalten, damit die Laufzeit andere Aufgaben ausführen kann.

Die acquire-Methode dekrementiert den Zähler, es kann jedoch sein, dass sie das Hinzufügen des Kontexts zur Warteschlange noch nicht abgeschlossen hat, bevor ein anderer Kontext die release-Methode aufruft. Um dies zu berücksichtigen, verwendet die release Methode eine Drehschleife, die die Parallelität::Context::Yield-Methode aufruft, um zu warten, bis die acquire Methode das Hinzufügen des Kontexts abgeschlossen hat.

Die release-Methode kann die Context::Unblock-Methode aufrufen, bevor die acquire-Methode die Context::Block-Methode aufruft. Sie müssen keinen Schutz vor dieser Racebedingung implementieren, da die Laufzeit diesen Methoden ermöglicht, in beliebiger Reihenfolge aufgerufen zu werden. Wenn die release-Methode Context::Unblock aufruft, bevor die acquire-Methode Context::Block für den gleichen Kontext aufruft, bleibt dieser Kontext weiterhin unblockiert. Die Laufzeit erfordert nur, dass für jeden Aufruf von Context::Block ein entsprechender Aufruf von Context::Unblock vorhanden ist.

Im folgenden Beispiel wird die vollständige semaphore-Klasse dargestellt. Die wmain-Funktion zeigt die grundlegende Verwendung dieser Klasse. Die wmain Funktion verwendet die Parallelität::p arallel_for-Algorithmus , um mehrere Aufgaben zu erstellen, die Zugriff auf das Semaphor erfordern. Da drei Threads jederzeit über die Sperre verfügen können, müssen einige Aufgaben warten, bis eine andere Aufgabe abgeschlossen ist und die Sperre freigibt.

// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to 
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

Dieses Beispiel erzeugt die folgende Beispielausgabe.

In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...

Weitere Informationen zur concurrent_queue Klasse finden Sie unter Parallel Containers and Objects. Weitere Informationen zum parallel_for Algorithmus finden Sie unter Parallel-Algorithmen.

Kompilieren des Codes

Kopieren Sie den Beispielcode, fügen Sie ihn in ein Visual Studio-Projekt ein, oder fügen Sie ihn in eine Datei ein, die benannt cooperative-semaphore.cpp ist, und führen Sie dann den folgenden Befehl in einem Visual Studio-Eingabeaufforderungsfenster aus.

cl.exe /EHsc cooperative-semaphore.cpp

Stabile Programmierung

Sie können das RAII-Muster (Resource Acquisition Is Initialization ) verwenden, um den Zugriff auf ein semaphore Objekt auf einen bestimmten Bereich zu beschränken. Unter dem RAII-Muster wird dem Stapel eine Datenstruktur zugeordnet. Diese Datenstruktur initialisiert oder ruft eine Ressource ab, wenn sie erstellt wird, und zerstört oder gibt diese Ressource frei, wenn die Datenstruktur zerstört wird. Das RAII-Muster garantiert, dass der Destruktor aufgerufen wird, bevor der einschließende Bereich beendet wird. Daher wird die Ressource ordnungsgemäß verwaltet, wenn eine Ausnahme ausgelöst wird, oder wenn eine Funktion mehrere return-Anweisungen enthält.

Im folgenden Beispiel wird eine Klasse mit dem Namen scoped_lock definiert, die im public-Abschnitt der semaphore-Klasse definiert ist. Die scoped_lock Klasse ähnelt der Parallelität::critical_section::scoped_lock und Parallelität::reader_writer_lock::scoped_lock Klassen. Der Konstruktor der semaphore::scoped_lock-Klasse erhält Zugriff auf das angegebene semaphore-Objekt, und der Destruktor gibt den Zugriff auf dieses Objekt frei.

// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

Im folgenden Beispiel wird der Text der Arbeitsfunktion geändert, die an den parallel_for-Algorithmus übergeben wird, damit RAII verwendet wird um sicherzustellen, dass die Semaphore vor der Rückkehr der Funktion freigegeben wird. Durch diese Methode wird sichergestellt, dass die Arbeitsfunktion sicher vor Ausnahmen ist.

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock 
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

Siehe auch

Kontexte
Parallele Container und Objekte