BlockingCollection<T> クラス
定義
重要
一部の情報は、リリース前に大きく変更される可能性があるプレリリースされた製品に関するものです。 Microsoft は、ここに記載されている情報について、明示または黙示を問わず、一切保証しません。
IProducerConsumerCollection<T>を実装するスレッド セーフなコレクションのブロック機能と境界機能を提供します。
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::Generic::IReadOnlyCollection<T>, System::Collections::ICollection
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.Versioning.UnsupportedOSPlatform("browser")>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface ICollection
interface IDisposable
interface IReadOnlyCollection<'T>
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T), IReadOnlyCollection(Of T)
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T)
型パラメーター
- T
コレクション内の要素の型。
- 継承
-
BlockingCollection<T>
- 属性
- 実装
例
次の例は、ブロックコレクションから同時に項目を追加および取得する方法を示しています。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class BlockingCollectionDemo
{
static async Task Main()
{
await AddTakeDemo.BC_AddTakeCompleteAdding();
TryTakeDemo.BC_TryTake();
FromToAnyDemo.BC_FromToAny();
await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}
class AddTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
public static async Task BC_AddTakeCompleteAdding()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Spin up a Task to populate the BlockingCollection
Task t1 = Task.Run(() =>
{
bc.Add(1);
bc.Add(2);
bc.Add(3);
bc.CompleteAdding();
});
// Spin up a Task to consume the BlockingCollection
Task t2 = Task.Run(() =>
{
try
{
// Consume the BlockingCollection
while (true) Console.WriteLine(bc.Take());
}
catch (InvalidOperationException)
{
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
}
});
await Task.WhenAll(t1, t2);
}
}
}
class TryTakeDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
public static void BC_TryTake()
{
// Construct and fill our BlockingCollection
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
int NUMITEMS = 10000;
for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
bc.CompleteAdding();
int outerSum = 0;
// Delegate for consuming the BlockingCollection and adding up all items
Action action = () =>
{
int localItem;
int localSum = 0;
while (bc.TryTake(out localItem)) localSum += localItem;
Interlocked.Add(ref outerSum, localSum);
};
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action);
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
}
}
}
class FromToAnyDemo
{
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
public static void BC_FromToAny()
{
BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
int numFailures = 0;
for (int i = 0; i < 10; i++)
{
if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
}
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);
// Should be able to retrieve 10 items
int numItems = 0;
int item;
while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
}
}
class ConsumingEnumerableDemo
{
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
public static async Task BC_GetConsumingEnumerable()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
// Kick off a producer task
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
bc.Add(i);
Console.WriteLine($"Producing: {i}");
await Task.Delay(100); // sleep 100 ms between adds
}
// Need to do this to keep foreach below from hanging
bc.CompleteAdding();
});
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
{
Console.WriteLine($"Consuming: {item}");
}
await producerTask; // Allow task to complete cleanup
}
}
}
open System
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
module AddTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
let blockingCollectionAddTakeCompleteAdding () =
task {
use bc = new BlockingCollection<int>()
// Spin up a Task to populate the BlockingCollection
let t1 =
task {
bc.Add 1
bc.Add 2
bc.Add 3
bc.CompleteAdding()
}
// Spin up a Task to consume the BlockingCollection
let t2 =
task {
try
// Consume consume the BlockingCollection
while true do
printfn $"{bc.Take()}"
with :? InvalidOperationException ->
// An InvalidOperationException means that Take() was called on a completed collection
printfn "That's All!"
}
let! _ = Task.WhenAll(t1, t2)
()
}
module TryTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
let blockingCollectionTryTake () =
// Construct and fill our BlockingCollection
use bc = new BlockingCollection<int>()
let NUMITEMS = 10000;
for i = 0 to NUMITEMS - 1 do
bc.Add i
bc.CompleteAdding()
let mutable outerSum = 0
// Delegate for consuming the BlockingCollection and adding up all items
let action =
Action(fun () ->
let mutable localItem = 0
let mutable localSum = 0
while bc.TryTake &localItem do
localSum <- localSum + localItem
Interlocked.Add(&outerSum, localSum)
|> ignore)
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
printfn $"Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}"
printfn $"bc.IsCompleted = {bc.IsCompleted} (should be true)"
module FromToAnyDemo =
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
let blockingCollectionFromToAny () =
let bcs =
[|
new BlockingCollection<int>(5) // collection bounded to 5 items
new BlockingCollection<int>(5) // collection bounded to 5 items
|]
// Should be able to add 10 items w/o blocking
let mutable numFailures = 0;
for i = 0 to 9 do
if BlockingCollection<int>.TryAddToAny(bcs, i) = -1 then
numFailures <- numFailures + 1
printfn $"TryAddToAny: {numFailures} failures (should be 0)"
// Should be able to retrieve 10 items
let mutable numItems = 0
let mutable item = 0
while BlockingCollection<int>.TryTakeFromAny(bcs, &item) <> -1 do
numItems <- numItems + 1
printfn $"TryTakeFromAny: retrieved {numItems} items (should be 10)"
module ConsumingEnumerableDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
let blockingCollectionGetConsumingEnumerable () =
task {
use bc = new BlockingCollection<int>()
// Kick off a producer task
let producerTask =
task {
for i = 0 to 9 do
bc.Add i
printfn $"Producing: {i}"
do! Task.Delay 100 // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
bc.CompleteAdding()
}
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
for item in bc.GetConsumingEnumerable() do
printfn $"Consuming: {item}"
do! producerTask // Allow task to complete cleanup
}
let main =
task {
do! AddTakeDemo.blockingCollectionAddTakeCompleteAdding ()
TryTakeDemo.blockingCollectionTryTake ()
FromToAnyDemo.blockingCollectionFromToAny ()
do! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable ()
printfn "Press any key to exit."
Console.ReadKey(true) |> ignore
}
main.Wait()
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Imports System.Threading
Class BlockingCollectionDemo
Shared Sub Main()
AddTakeDemo.BC_AddTakeCompleteAdding()
TryTakeDemo.BC_TryTake()
ToAnyDemo.BC_ToAny()
ConsumingEnumerableDemo.BC_GetConsumingEnumerable()
' Keep the console window open in debug mode
Console.WriteLine("Press any key to exit.")
Console.ReadKey()
End Sub
End Class
Class AddTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.Take()
' BlockingCollection<T>.CompleteAdding()
Shared Sub BC_AddTakeCompleteAdding()
Using bc As New BlockingCollection(Of Integer)()
' Spin up a Task to populate the BlockingCollection
Using t1 As Task = Task.Factory.StartNew(
Sub()
bc.Add(1)
bc.Add(2)
bc.Add(3)
bc.CompleteAdding()
End Sub)
' Spin up a Task to consume the BlockingCollection
Using t2 As Task = Task.Factory.StartNew(
Sub()
Try
' Consume the BlockingCollection
While True
Console.WriteLine(bc.Take())
End While
Catch generatedExceptionName As InvalidOperationException
' An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!")
End Try
End Sub)
Task.WaitAll(t1, t2)
End Using
End Using
End Using
End Sub
End Class
'Imports System.Collections.Concurrent
'Imports System.Threading
'Imports System.Threading.Tasks
Class TryTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.TryTake()
' BlockingCollection<T>.IsCompleted
Shared Sub BC_TryTake()
' Construct and fill our BlockingCollection
Using bc As New BlockingCollection(Of Integer)()
Dim NUMITEMS As Integer = 10000
For i As Integer = 0 To NUMITEMS - 1
bc.Add(i)
Next
bc.CompleteAdding()
Dim outerSum As Integer = 0
' Delegate for consuming the BlockingCollection and adding up all items
Dim action As Action =
Sub()
Dim localItem As Integer
Dim localSum As Integer = 0
While bc.TryTake(localItem)
localSum += localItem
End While
Interlocked.Add(outerSum, localSum)
End Sub
' Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2))
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted)
End Using
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' Bounded BlockingCollection<T>
' BlockingCollection<T>.TryAddToAny()
' BlockingCollection<T>.TryTakeFromAny()
Class ToAnyDemo
Shared Sub BC_ToAny()
Dim bcs As BlockingCollection(Of Integer)() = New BlockingCollection(Of Integer)(1) {}
bcs(0) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
bcs(1) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
' Should be able to add 10 items w/o blocking
Dim numFailures As Integer = 0
For i As Integer = 0 To 9
If BlockingCollection(Of Integer).TryAddToAny(bcs, i) = -1 Then
numFailures += 1
End If
Next
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures)
' Should be able to retrieve 10 items
Dim numItems As Integer = 0
Dim item As Integer
While BlockingCollection(Of Integer).TryTakeFromAny(bcs, item) <> -1
numItems += 1
End While
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems)
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.GetConsumingEnumerable()
Class ConsumingEnumerableDemo
Shared Sub BC_GetConsumingEnumerable()
Using bc As New BlockingCollection(Of Integer)()
' Kick off a producer task
Task.Factory.StartNew(
Sub()
For i As Integer = 0 To 9
bc.Add(i)
' sleep 100 ms between adds
Thread.Sleep(100)
Next
' Need to do this to keep foreach below from not responding.
bc.CompleteAdding()
End Sub)
' Now consume the blocking collection with foreach.
' Use bc.GetConsumingEnumerable() instead of just bc because the
' former will block waiting for completion and the latter will
' simply take a snapshot of the current state of the underlying collection.
For Each item In bc.GetConsumingEnumerable()
Console.WriteLine(item)
Next
End Using
End Sub
End Class
注釈
BlockingCollection<T> は、以下を提供するスレッド セーフなコレクション クラスです。
プロデューサー/コンシューマー パターンの実装。BlockingCollection<T> は、IProducerConsumerCollection<T> インターフェイスのラッパーです。
TryAdd または TryTake メソッドで CancellationToken オブジェクトを使用して、Add または Take 操作を取り消します。
大事な
この型は、IDisposable インターフェイスを実装します。 型の使用が完了したら、直接または間接的に破棄する必要があります。 型を直接破棄するには、try
/catch
ブロックでその Dispose メソッドを呼び出します。 間接的に破棄するには、using
(C#) や Using
(Visual Basic) などの言語コンストラクトを使用します。 詳細については、「IDisposable インターフェイス」トピックの「IDisposable を実装するオブジェクトの使用」セクションを参照してください。 また、Dispose() メソッドはスレッド セーフではないことに注意してください。
BlockingCollection<T> の他のすべてのパブリック メンバーと保護されたメンバーはスレッド セーフであり、複数のスレッドから同時に使用できます。
IProducerConsumerCollection<T> は、スレッド セーフなデータの追加と削除を可能にするコレクションを表します。 BlockingCollection<T> は、IProducerConsumerCollection<T> インスタンスのラッパーとして使用され、データを削除できるようになるまでコレクションからの削除試行をブロックできます。 同様に、BlockingCollection<T> を作成して、IProducerConsumerCollection<T>で許可されているデータ要素の数に上限を適用できます。コレクションに対する追加の試行は、追加された項目を格納するための領域が使用可能になるまでブロックされる可能性があります。 この方法では、BlockingCollection<T> は従来のブロッキング キュー データ構造に似ていますが、基になるデータ ストレージ メカニズムが IProducerConsumerCollection<T>として抽象化される点が異なります。
BlockingCollection<T> では、境界とブロックがサポートされます。 境界は、コレクションの最大容量を設定できることを意味します。 境界は、メモリ内のコレクションの最大サイズを制御できるため、特定のシナリオで重要です。これにより、生成されるスレッドが消費するスレッドの前に移動しすぎなくなります。複数のスレッドまたはタスクが同時にコレクションに項目を追加でき、コレクションが指定された最大容量に達すると、アイテムが削除されるまで生成スレッドはブロックされます。 複数のコンシューマーが同時に項目を削除でき、コレクションが空になると、プロデューサーが項目を追加するまで、消費スレッドはブロックされます。 生成スレッドは、CompleteAdding メソッドを呼び出して、追加される項目がないことを示すことができます。 コンシューマーは、IsCompleted プロパティを監視して、コレクションが空で項目が追加されないタイミングを確認します。
通常、Add 操作と Take 操作はループで実行されます。 ループを取り消すには、CancellationToken オブジェクトを TryAdd メソッドまたは TryTake メソッドに渡し、各反復処理でトークンの IsCancellationRequested プロパティの値を確認します。 値が true
場合は、リソースをクリーンアップしてループを終了することで、取り消し要求に応答する必要があります。
BlockingCollection<T> オブジェクトを作成する場合は、バインドされた容量だけでなく、使用するコレクションの種類も指定できます。 たとえば、先入れ先出し (FIFO) 動作に対して ConcurrentQueue<T> オブジェクトを指定したり、最後の入力、先入れ先出し (LIFO) 動作に ConcurrentStack<T> オブジェクトを指定したりできます。 IProducerConsumerCollection<T> インターフェイスを実装する任意のコレクション クラスを使用できます。 BlockingCollection<T> の既定のコレクションの種類は ConcurrentQueue<T>です。
基になるコレクションを直接変更しないでください。 BlockingCollection<T> メソッドを使用して要素を追加または削除します。 基になるコレクションを直接変更すると、BlockingCollection<T> オブジェクトが破損する可能性があります。
BlockingCollection<T> は非同期アクセスを念頭に置いて設計されていません。 アプリケーションで非同期プロデューサー/コンシューマー シナリオが必要な場合は、代わりに Channel<T> の使用を検討してください。
コンストラクター
BlockingCollection<T>() |
BlockingCollection<T> クラスの新しいインスタンスを、上限なしで初期化します。 |
BlockingCollection<T>(Int32) |
指定した上限を使用して、BlockingCollection<T> クラスの新しいインスタンスを初期化します。 |
BlockingCollection<T>(IProducerConsumerCollection<T>) |
指定された IProducerConsumerCollection<T> を基になるデータ ストアとして使用して、上限を指定せずに、BlockingCollection<T> クラスの新しいインスタンスを初期化します。 |
BlockingCollection<T>(IProducerConsumerCollection<T>, Int32) |
指定した上限を使用し、指定した IProducerConsumerCollection<T> を基になるデータ ストアとして使用して、BlockingCollection<T> クラスの新しいインスタンスを初期化します。 |
プロパティ
BoundedCapacity |
この BlockingCollection<T> インスタンスの有界容量を取得します。 |
Count |
BlockingCollection<T>に含まれる項目の数を取得します。 |
IsAddingCompleted |
この BlockingCollection<T> が追加のために完了としてマークされているかどうかを取得します。 |
IsCompleted |
この BlockingCollection<T> が追加のために完了としてマークされ、空であるかどうかを取得します。 |
メソッド
明示的なインターフェイスの実装
ICollection.CopyTo(Array, Int32) |
ターゲット配列の指定したインデックスから始まる、BlockingCollection<T> インスタンス内のすべての項目を互換性のある 1 次元配列にコピーします。 |
ICollection.IsSynchronized |
ICollection へのアクセスが同期されているかどうかを示す値を取得します (スレッド セーフ)。 |
ICollection.SyncRoot |
ICollectionへのアクセスを同期するために使用できるオブジェクトを取得します。 このプロパティはサポートされていません。 |
IEnumerable.GetEnumerator() |
コレクション内の項目の IEnumerator を提供します。 |
IEnumerable<T>.GetEnumerator() |
コレクション内の項目の IEnumerator<T> を提供します。 |
拡張メソッド
適用対象
スレッド セーフ
Dispose メソッドはスレッド セーフではありません。 BlockingCollection<T> の他のすべてのパブリック メンバーと保護されたメンバーはスレッド セーフであり、複数のスレッドから同時に使用できます。
こちらもご覧ください
.NET