Nasıl yapılır: Üretici-tüketici veri akışı deseni uygulama

Bu makalede, üretici-tüketici deseni uygulamak için TPL veri akışı kitaplığını kullanmayı öğreneceksiniz. Bu düzende , üretici bir ileti bloğuna ileti gönderir ve tüketici bu bloktaki iletileri okur.

Not

TPL Veri Akışı Kitaplığı ( System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Ad alanını System.Threading.Tasks.Dataflow Visual Studio'ya yüklemek için projenizi açın, Proje menüsünden NuGet Paketlerini Yönet'i seçin ve çevrimiçi ortamda System.Threading.Tasks.Dataflow paketi arayın. Alternatif olarak, .NET Core CLI kullanarak yüklemek için komutunu çalıştırındotnet add package System.Threading.Tasks.Dataflow.

Örnek

Aşağıdaki örnekte veri akışını kullanan temel bir üretici-tüketici modeli gösterilmektedir. yöntemi, Produce bir nesneye System.Threading.Tasks.Dataflow.ITargetBlock<TInput> rastgele veri baytları içeren diziler yazar ve Consume yöntem bir System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> nesneden bayt okur. ve ITargetBlock<TInput> arabirimleri üzerinde ISourceBlock<TOutput> işlem yaparak, türetilmiş türleri yerine, çeşitli veri akışı blok türleri üzerinde işlem yapabilecek yeniden kullanılabilir kod yazabilirsiniz. Bu örnekte sınıfı kullanılır BufferBlock<T> . BufferBlock<T> sınıfı hem kaynak blok hem de hedef blok olarak davrandığından, üretici ve tüketici verileri aktarmak için paylaşılan bir nesne kullanabilir.

yöntemi, Produce hedef bloğa Post zaman uyumlu bir şekilde veri yazmak için yöntemini bir döngüde çağırır. Produce yöntemi tüm verileri hedef bloğa yazdıktan sonra, bloğun Complete hiçbir zaman kullanılabilir ek veriye sahip olmadığını belirtmek için yöntemini çağırır. yöntemi, nesneden ISourceBlock<TOutput> alınan toplam bayt sayısını zaman uyumsuz olarak hesaplamak için zaman uyumsuz ve await işleçlerini (Async ve Await in Visual Basic) kullanır.Consume Zaman uyumsuz olarak hareket etmek için yöntem, Consume kaynak bloğun OutputAvailableAsync kullanılabilir verileri olduğunda ve kaynak bloğunda hiçbir zaman ek veri olmadığında bildirim almak için yöntemini çağırır.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Sağlam programlama

Yukarıdaki örnek, kaynak verileri işlemek için yalnızca bir tüketici kullanır. Uygulamanızda birden çok tüketici varsa, aşağıdaki örnekte gösterildiği gibi kaynak bloktaki verileri okumak için yöntemini kullanın TryReceive .

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Yöntem TryReceive , kullanılabilir veri olmadığında döndürür False . Birden çok tüketicinin kaynak bloğuna eşzamanlı olarak erişmesi gerektiğinde, bu mekanizma çağrısından OutputAvailableAsyncsonra verilerin hala kullanılabilir olmasını garanti eder.

Ayrıca bkz.