Como: Adicionar e remover itens individualmente de uma BlockingCollection
Este exemplo mostra como adicionar e remover itens de um BlockingCollection<T> tanto com bloqueio quanto sem bloqueio. Para obter mais informações sobre BlockingCollection<T>, veja Visão geral de BlockingCollection.
Para obter um exemplo de como enumerar um BlockingCollection<T> até que ele fique vazio e não existam mais elementos a adicionar, veja Como usar ForEach para remover itens de uma BlockingCollection.
Exemplo 1
Este primeiro exemplo mostra como adicionar e remover itens para que as operações sejam bloqueadas se a coleção fica temporariamente vazia (ao remover) ou com a capacidade máxima (ao adicionar) ou se um período de tempo limite especificado é decorrido. Observe que o bloqueio de capacidade máxima é habilitado apenas quando a BlockingCollection é criada com uma capacidade máxima especificada no construtor.
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// Increase or decrease this value as desired.
int itemsToAdd = 500;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, itemsToAdd * 2 + 3);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// A bounded collection. Increase, decrease, or remove the
// maximum capacity argument to see how it impacts behavior.
var numbers = new BlockingCollection<int>(50);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
int i = -1;
while (!numbers.IsCompleted)
{
try
{
i = numbers.Take();
}
catch (InvalidOperationException)
{
Console.WriteLine("Adding was completed!");
break;
}
Console.WriteLine("Take:{0} ", i);
// Simulate a slow consumer. This will cause
// collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000);
}
Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
for (int i = 0; i < itemsToAdd; i++)
{
numbers.Add(i);
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
}
// See documentation for this method.
numbers.CompleteAdding();
});
// Keep the console display open in debug mode.
Console.ReadLine();
}
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module SimpleBlocking
Class Program
Shared Sub Main()
' Increase or decrease this value as desired.
Dim itemsToAdd As Integer = 500
' Preserve all the display output for Adds and Takes
Console.SetBufferSize(80, (itemsToAdd * 2) + 3)
' A bounded collection. Increase, decrease, or remove the
' maximum capacity argument to see how it impacts behavior.
Dim numbers = New BlockingCollection(Of Integer)(50)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
Dim i As Integer = -1
While numbers.IsCompleted = False
Try
i = numbers.Take()
Catch ioe As InvalidOperationException
Console.WriteLine("Adding was completed!")
Exit While
End Try
Console.WriteLine("Take:{0} ", i)
' Simulate a slow consumer. This will cause
' collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000)
End While
Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
For i As Integer = 0 To itemsToAdd
numbers.Add(i)
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
Next
'See documentation for this method.
numbers.CompleteAdding()
End Sub)
'Keep the console window open in debug mode.
Console.ReadLine()
End Sub
End Class
End Module
Exemplo 2
Este segundo exemplo mostra como adicionar e tirar itens para que as operações não sejam bloqueadas. Se nenhum item estiver presente, a capacidade máxima em uma coleção associada tiver sido atingida ou o tempo limite tiver expirado, a operação TryAdd ou TryTake retornará false. Isso permite que o thread faça algum trabalho útil por um tempo e então, mais tarde, tente novamente recuperar um item novo ou tente adicionar o mesmo item que não pôde ser adicionado anteriormente. O programa também demonstra como implementar o cancelamento ao acessar um BlockingCollection<T>.
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class ProgramWithCancellation
{
static int inputs = 2000;
static void Main()
{
// The token source for issuing the cancelation request.
var cts = new CancellationTokenSource();
// A blocking collection that can hold no more than 100 items at a time.
var numberCollection = new BlockingCollection<int>(100);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, 8000);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// The simplest UI thread ever invented.
Task.Run(() =>
{
if (Console.ReadKey(true).KeyChar == 'c')
{
cts.Cancel();
}
});
// Start one producer and one consumer.
Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));
// Wait for the tasks to complete execution
Task.WaitAll(t1, t2);
cts.Dispose();
Console.WriteLine("Press the Enter key to exit.");
Console.ReadLine();
}
static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
{
// IsCompleted == (IsAddingCompleted && Count == 0)
while (!bc.IsCompleted)
{
int nextItem = 0;
try
{
if (!bc.TryTake(out nextItem, 0, ct))
{
Console.WriteLine(" Take Blocked");
}
else
{
Console.WriteLine(" Take:{0}", nextItem);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Taking canceled.");
break;
}
// Slow down consumer just a little to cause
// collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000);
}
Console.WriteLine("\r\nNo more items to take.");
}
static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
{
int itemToAdd = 0;
bool success = false;
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
// A shorter timeout causes more failures.
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
Console.WriteLine("Add loop canceled.");
// Let other threads know we're done in case
// they aren't monitoring the cancellation token.
bc.CompleteAdding();
break;
}
if (success)
{
Console.WriteLine(" Add:{0}", itemToAdd);
itemToAdd++;
}
else
{
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
// Don't increment nextItem. Try again on next iteration.
//Do something else useful instead.
UpdateProgress(itemToAdd);
}
} while (itemToAdd < inputs);
// No lock required here because only one producer.
bc.CompleteAdding();
}
static void UpdateProgress(int i)
{
double percent = ((double)i / inputs) * 100;
Console.WriteLine("Percent complete: {0}", percent);
}
}
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Class NonBlockingAccess
Shared inputs As Integer = 2000
Shared Sub Main()
' The token source for issuing the cancelation request.
Dim cts As New CancellationTokenSource()
' A blocking collection that can hold no more than 100 items at a time.
Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)
' Set console buffer to hold our prodigious output.
Console.SetBufferSize(80, 2000)
' The simplest UI thread ever invented.
Task.Run(Sub()
If Console.ReadKey.KeyChar() = "c"c Then
cts.Cancel()
End If
End Sub)
' Start one producer and one consumer.
Dim t1 As Task = Task.Run(Sub() NonBlockingConsumer(numberCollection, cts.Token))
Dim t2 As Task = Task.Run(Sub() NonBlockingProducer(numberCollection, cts.Token))
' Wait for the tasks to complete execution
Task.WaitAll(t1, t2)
cts.Dispose()
Console.WriteLine("Press the Enter key to exit.")
Console.ReadLine()
End Sub
Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
While bc.IsCompleted = False
Dim nextItem As Integer = 0
Try
If bc.TryTake(nextItem, 0, ct) = False Then
Console.WriteLine(" Take Blocked.")
Else
Console.WriteLine(" Take: {0}", nextItem)
End If
Catch ex As OperationCanceledException
Console.WriteLine("Taking canceled.")
Exit While
End Try
'Slow down consumer just a little to cause
' collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000)
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub
Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
Dim itemToAdd As Integer = 0
Dim success As Boolean = False
Do
'Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
Console.WriteLine("Add loop canceled.")
' Let other threads know we're done in case
' they aren't monitoring the cancellation token.
bc.CompleteAdding()
Exit Do
End Try
If success = True Then
Console.WriteLine(" Add:{0}", itemToAdd)
itemToAdd = itemToAdd + 1
Else
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)
' Don't increment nextItem. Try again on next iteration
' Do something else useful instead.
UpdateProgress(itemToAdd)
End If
Loop While itemToAdd < inputs
' No lock required here because only one producer.
bc.CompleteAdding()
End Sub
Shared Sub UpdateProgress(ByVal i As Integer)
Dim percent As Double = (CType(i, Double) / inputs) * 100
Console.WriteLine("Percent complete: {0}", percent)
End Sub
End Class