Visão geral de BlockingCollection

BlockingCollection<T> é uma classe de coleção thread-safe que fornece os seguintes recursos:

  • Uma implementação do padrão de produtor-consumidor.

  • Adição e remoção simultâneas de itens de vários threads.

  • Capacidade máxima opcional.

  • Operações de inserção e remoção que bloqueiam quando a coleção está vazia ou cheia.

  • Operações “try” de inserção e remoção que não bloqueiam ou bloqueiam até um período específico.

  • Encapsula qualquer tipo de coleção que implemente IProducerConsumerCollection<T>

  • Cancelamento com tokens de cancelamento.

  • Dois tipos de enumeração com foreach (For Each no Visual Basic):

    1. Enumeração de somente leitura.

    2. Enumeração que remove itens conforme eles são enumerados.

Suporte a delimitação e bloqueio

BlockingCollection<T> dá suporte à delimitação e ao bloqueio. Delimitação significa que você pode definir a capacidade máxima da coleção. A delimitação é importante em determinados cenários, porque permite que você controle o tamanho máximo da coleção na memória e impede que os threads de produção se distanciem muito a frente dos threads de consumo.

Várias tarefas ou threads podem adicionar itens à coleção simultaneamente e se a coleção atingir sua capacidade máxima especificada, os threads de produção serão bloqueados até que um item seja removido. Vários consumidores podem remover itens simultaneamente e, se a coleção ficar vazia, os threads de consumo serão bloqueados até que um produtor adicione um item. Um thread de produção pode chamar CompleteAdding para indicar que não serão adicionados mais itens. Os consumidores monitoram a propriedade IsCompleted para saber quando a coleção está vazia e não serão adicionados mais itens. O exemplo a seguir mostra uma BlockingCollection simples com uma capacidade limitada igual a 100. Uma tarefa de produtor adiciona itens à coleção contanto que algumas condições sejam verdadeiras e, em seguida, chama CompleteAdding. A tarefa de consumidor tira itens até que a propriedade IsCompleted seja true.

// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
    while (!dataItems.IsCompleted)
    {

        Data data = null;
        // Blocks if dataItems.Count == 0.
        // IOE means that Take() was called on a completed collection.
        // Some other thread can call CompleteAdding after we pass the
        // IsCompleted check but before we call Take.
        // In this example, we can simply catch the exception since the
        // loop will break on the next iteration.
        try
        {
            data = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (data != null)
        {
            Process(data);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// A simple blocking producer with no cancellation.
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data data = GetData();
        // Blocks if numbers.Count == dataItems.BoundedCapacity
        dataItems.Add(data);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

' A bounded collection. It can hold no more 
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)

' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
                          While dataItems.IsCompleted = False
                              Dim dataItem As Data = Nothing
                              Try
                                  dataItem = dataItems.Take()
                              Catch e As InvalidOperationException
                                  ' IOE means that Take() was called on a completed collection.
                                  ' In this example, we can simply catch the exception since the 
                                  ' loop will break on the next iteration.
                              End Try
                              If (dataItem IsNot Nothing) Then
                                  Process(dataItem)
                              End If
                          End While
                          Console.WriteLine(vbCrLf & "No more items to take.")
                      End Sub)

' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
                          While moreItemsToAdd = True
                              Dim item As Data = GetData()

                              ' Blocks if dataItems.Count = dataItems.BoundedCapacity.
                              dataItems.Add(item)
                          End While

                          ' Let consumer know we are done.
                          dataItems.CompleteAdding()
                      End Sub)

Para obter um exemplo completo, consulte Como adicionar e tirar itens individualmente de uma BlockingCollection.

Operações de bloqueio cronometrado

Nas operações TryAdd e TryTake de bloqueio cronometrado em coleções limitadas, o método tenta adicionar ou tirar um item. Se houver um item disponível, ele será colocado na variável que foi passada por referência e o método retornará true. Se nenhum item for recuperado após um período de tempo limite especificado, o método retornará false. O thread fica, então, livre para realizar outro trabalho útil antes de tentar acessar a coleção novamente. Para obter um exemplo de acesso com bloqueio cronometrado, consulte o segundo exemplo em Como adicionar e tirar itens individualmente de uma BlockingCollection.

Cancelando as operações Add e Take

As operações Add e Take normalmente são realizadas em um loop. Você pode cancelar um loop, passando um CancellationToken para o método TryAdd ou TryTake e, em seguida, verificando o valor da propriedade IsCancellationRequested do token em cada iteração. Se o valor for true, caberá a você responder à solicitação de cancelamento limpando os recursos e saindo do loop. O exemplo a seguir mostra uma sobrecarga de TryAdd que usa um token de cancelamento e o código que ele usa:

do
{
    // Cancellation causes OCE. We know how to handle it.
    try
    {
        success = bc.TryAdd(itemToAdd, 2, ct);
    }
    catch (OperationCanceledException)
    {
        bc.CompleteAdding();
        break;
    }
    //...
} while (moreItems == true);
Do While moreItems = True
    ' Cancellation causes OCE. We know how to handle it.
    Try
        success = bc.TryAdd(itemToAdd, 2, ct)
    Catch ex As OperationCanceledException
        bc.CompleteAdding()
        Exit Do
    End Try
Loop

Para obter um exemplo de como adicionar suporte a cancelamento, veja o segundo exemplo em Instruções: adicionar e remover itens individualmente de uma BlockingCollection.

Especificando o tipo de coleção

Ao criar um BlockingCollection<T>, você pode especificar não apenas a capacidade limitada, mas também o tipo de coleção a ser usado. Por exemplo, seria possível especificar um ConcurrentQueue<T> para o comportamento PEPS (primeiro a entrar, primeiro a sair) ou um ConcurrentStack<T> para o comportamento UEPS (último a entrar, primeiro a sair). Você pode usar qualquer classe de coleção que implemente a interface IProducerConsumerCollection<T>. O tipo de coleção padrão para BlockingCollection<T> é ConcurrentQueue<T>. O exemplo de código a seguir mostra como criar um BlockingCollection<T> de cadeias de caracteres que tenha uma capacidade de 1000 e use um ConcurrentBag<T>:

Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)  
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );  

Para obter mais informações, consulte Instruções: adicionar as funcionalidades de limitação e bloqueio a uma coleção.

Suporte a IEnumerable

O BlockingCollection<T> fornece um método GetConsumingEnumerable que permite que os consumidores usem foreach (For Each em Visual Basic) para remover itens até que a coleção seja concluída, ou seja, que ela esteja vazia e que não sejam adicionados mais itens. Para obter mais informações, confira Como usar ForEach para remover itens de uma BlockingCollection.

Usando vários BlockingCollections como um

Para cenários em que um consumidor precisa remover itens de várias coleções simultaneamente, é possível criar matrizes de BlockingCollection<T> e usar os métodos estáticos como TakeFromAny e AddToAny que adicionarão ou retirarão de qualquer uma das coleções na matriz. Se uma coleção for de bloqueio, o método imediatamente tenta outra até encontrar uma que possa realizar a operação. Para obter mais informações, confira Como usar matrizes de coleções Blocking em um pipeline.

Confira também