如何:使用 ForEach 移除 BlockingCollection 中的项
除了使用 Take 和 TryTake 方法从 BlockingCollection<T> 中提取项之外,还可以使用 foreach(在 Visual Basic 中为 For Each)移除项,直至添加完成并且集合为空。 由于与典型的 foreach (For Each) 循环不同,此枚举器通过移除项来修改源集合,因此将其称作“转变枚举”或“使用枚举”。
示例
下面的示例演示如何通过使用 foreach (For Each) 循环来移除 BlockingCollection<T> 中的所有项。
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module EnumerateBC
Class Program
' Limit the collection size to 2000 items
' at any given time. Set itemsToProduce to >500
' to hit the limit.
Const upperLimit As Integer = 1000
' Adjust this number to see how it impacts
' the producing-consuming pattern.
Const itemsToProduce As Integer = 100
Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)
' Variables for diagnostic output only.
Shared sw As New Stopwatch()
Shared totalAdditions As Integer = 0
' Counter for synchronizing producers.
Shared producersStillRunning As Integer = 2
Shared Sub Main()
' Start the stopwatch.
sw.Start()
' Queue the Producer threads.
Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))
' Store in an array for use with ContinueWhenAll
Dim producers() As Task = {task1, task2}
' Create a cleanup task that will call CompleteAdding after
' all producers are done adding items.
Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())
' Queue the Consumer thread. Put this call
' before Parallel.Invoke to begin consuming as soon as
' the producers add items.
Task.Factory.StartNew(Sub() RunConsumer())
' Keep the console window open while the
' consumer thread completes its output.
Console.ReadKey()
End Sub
Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
Dim additions As Integer = 0
For i As Integer = start To start + itemsToProduce - 1
' The data that is added to the collection.
Dim ticks As Long = sw.ElapsedTicks
'Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)
' Don't try to add item after CompleteAdding
' has been called.
If collection.IsAddingCompleted = False Then
collection.Add(ticks)
End If
' Counter for demonstration purposes only.
additions = additions + 1
' Uncomment this line to
' slow down the producer threads without sleeping.
Thread.SpinWait(100000)
Next
Interlocked.Add(totalAdditions, additions)
Console.WriteLine("{0} is done adding: {1} items", ID, additions)
End Sub
Shared Sub RunConsumer()
' GetConsumingEnumerable returns the enumerator for the
' underlying collection.
Dim subtractions As Integer = 0
For Each item In collection.GetConsumingEnumerable
subtractions = subtractions + 1
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions, collection.Count)
Next
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count())
sw.Stop()
Console.WriteLine("Press any key to exit.")
End Sub
End Class
End Module
namespace EnumerateBlockingCollection
{
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
class Program
{
// Limit the collection size to 2000 items
// at any given time. Set itemsToProduce to >500
// to hit the limit.
const int upperLimit = 1000;
// Adjust this number to see how it impacts
// the producing-consuming pattern.
const int itemsToProduce = 100;
static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);
// Variables for diagnostic output only.
static Stopwatch sw = new Stopwatch();
static int totalAdditions = 0;
// Counter for synchronizing producers.
static int producersStillRunning = 2;
static void Main(string[] args)
{
// Start the stopwatch.
sw.Start();
// Queue the Producer threads. Store in an array
// for use with ContinueWhenAll
Task[] producers = new Task[2];
producers[0] = Task.Factory.StartNew(() => RunProducer("A", 0));
producers[1] = Task.Factory.StartNew(() => RunProducer("B", itemsToProduce));
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());
// Queue the Consumer thread. Put this call
// before Parallel.Invoke to begin consuming as soon as
// the producers add items.
Task.Factory.StartNew(() => RunConsumer());
// Keep the console window open while the
// consumer thread completes its output.
Console.ReadKey();
}
static void RunProducer(string ID, int start)
{
int additions = 0;
for (int i = start; i < start + itemsToProduce; i++)
{
// The data that is added to the collection.
long ticks = sw.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);
if(!collection.IsAddingCompleted)
collection.Add(ticks);
// Counter for demonstration purposes only.
additions++;
// Uncomment this line to
// slow down the producer threads ing.
Thread.SpinWait(100000);
}
Interlocked.Add(ref totalAdditions, additions);
Console.WriteLine("{0} is done adding: {1} items", ID, additions);
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the
// underlying collection.
int subtractions = 0;
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions++, collection.Count);
}
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count());
sw.Stop();
Console.WriteLine("Press any key to exit");
}
}
}
此示例将 foreach 循环与使用线程中的 BlockingCollection<T>.GetConsumingEnumerable 方法结合使用,这会导致在枚举每个项时将其从集合中移除。 System.Collections.Concurrent.BlockingCollection<T> 可限制集合中任意时间所包含的最大项数。 按照此方式枚举集合会在没有项可用或集合为空时阻塞使用者线程。 在此示例中,由于制造者线程添加项的速度快于使用项的速度,因此不需要考虑阻塞问题。
不能保证按照制造者线程添加项的顺序来枚举这些项。
若要枚举集合而不对其进行修改,只需使用 foreach (For Each) 即可,而无需使用 GetConsumingEnumerable 方法。 但是,一定要了解,此类枚举表示某个准确时间点的集合快照。 如果其他线程在您执行循环时同时添加或移除项,则循环可能不会表示集合的实际状态。