Nasıl yapılır: Gecikmeyi Dengelemek için Aşırı Aboneliği Kullanma

Fazla abonelik, yüksek gecikme süresine sahip görevler içeren bazı uygulamaların genel verimliliğini artırabilir. Bu konuda, ağ bağlantısından veri okumanın neden olduğu gecikme süresini dengelemek için oversubscription'ın nasıl kullanılacağı gösterilmektedir.

Örnek

Bu örnek, HTTP sunucularından dosya indirmek için Zaman Uyumsuz Aracılar Kitaplığı'nı kullanır. http_reader sınıfı concurrency::agent öğesinden türetilir ve indirilmesi gereken URL adlarını zaman uyumsuz olarak okumak için ileti geçirmeyi kullanır.

sınıfı, http_reader her dosyayı eşzamanlı olarak okumak için eşzamanlılık::task_group sınıfını kullanır. Her görev eşzamanlılık::Context::Oversubscribe yöntemini_BeginOversubscription, geçerli bağlamda oversubscription'ı etkinleştirmek için parametresi ayarlı olarak true çağırır. Her görev daha sonra dosyayı indirmek için Microsoft Foundation Sınıfları (MFC) CInternetSession ve CHttpFile sınıflarını kullanır. Son olarak, her görev fazla aboneliği devre dışı bırakmak için false parametresi olarak ayarlanmış olarak çağırır.Context::Oversubscribe _BeginOversubscription

Fazla abonelik etkinleştirildiğinde, çalışma zamanı görevlerin çalıştırıldığı bir iş parçacığı daha oluşturur. Bu iş parçacıklarının her biri geçerli bağlamı da aşabilir ve böylece ek iş parçacıkları oluşturabilir. sınıfı, http_reader uygulamanın kullandığı iş parçacığı sayısını sınırlamak için eşzamanlılık ::unbounded_buffer nesnesi kullanır. Aracı, sabit sayıda belirteç değeriyle arabelleği başlatır. Her indirme işlemi için aracı, işlem başlamadan önce arabellekten bir belirteç değeri okur ve işlem tamamlandıktan sonra bu değeri arabelleğe geri yazar. Arabellek boş olduğunda aracı, indirme işlemlerinden birinin arabelleğe geri bir değer yazmasını bekler.

Aşağıdaki örnek, eşzamanlı görev sayısını kullanılabilir donanım iş parçacığı sayısının iki katıyla sınırlar. Bu değer, fazla abonelikle deneme yaparken kullanmak için iyi bir başlangıç noktasıdır. Belirli bir işleme ortamına uyan bir değer kullanabilir veya bu değeri gerçek iş yüküne yanıt verecek şekilde dinamik olarak değiştirebilirsiniz.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

Bu örnek, dört işlemcisi olan bir bilgisayarda aşağıdaki çıkışı üretir:

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

Diğer görevler gizli bir işlemin bitmesini beklerken ek görevler çalıştırıldığından, fazla abonelik etkinleştirildiğinde örnek daha hızlı çalıştırılabilir.

Kod Derleniyor

Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya adlı download-oversubscription.cpp bir dosyaya yapıştırın ve ardından Visual Studio Komut İstemi penceresinde aşağıdaki komutlardan birini çalıştırın.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Güçlü Programlama

Artık gerekmedikten sonra her zaman fazla aboneliği devre dışı bırakın. Başka bir işlev tarafından oluşan bir özel durumu işlemeyen bir işlev düşünün. İşlev dönmeden önce fazla aboneliği devre dışı bırakmazsanız, ek paralel çalışmalar da geçerli bağlamı aşırı abone yapar.

Kaynak Alımı Başlatmadır (RAII) desenini kullanarak, fazla aboneliği belirli bir kapsamla sınırlandırabilirsiniz. RAII deseni altında, yığında bir veri yapısı ayrılır. Bu veri yapısı oluşturulduğunda bir kaynağı başlatır veya alır ve veri yapısı yok edildiğinde bu kaynağı yok eder veya serbest bırakır. RAII deseni, kapsayan kapsam çıkmadan önce yıkıcının çağrılmasını garanti eder. Bu nedenle, bir özel durum oluştuğunda veya bir işlev birden çok return deyim içerdiğinde kaynak doğru şekilde yönetilir.

Aşağıdaki örnek, adlı scoped_blocking_signalbir yapıyı tanımlar. Yapı oluşturucu scoped_blocking_signal , fazla abonelik sağlar ve yıkıcı oversubscription devre dışı bırakır.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

Aşağıdaki örnek, işlev döndürmeden önce fazla aboneliğin download devre dışı bırakılmasını sağlamak için yöntemin gövdesini RAII kullanacak şekilde değiştirir. Bu teknik, yönteminin download özel durum açısından güvenli olmasını sağlar.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Ayrıca bkz.

Bağlamlar
Context::Oversubscribe Yöntemi