System.Threading.Channels kitaplığı
Ad System.Threading.Channels alanı, verileri üreticiler ve tüketiciler arasında zaman uyumsuz olarak geçirmek için bir eşitleme veri yapıları kümesi sağlar. Kitaplık .NET Standard'ı hedefler ve tüm .NET uygulamaları üzerinde çalışır.
Bu kitaplık System.Threading.Channels NuGet paketinde kullanılabilir. Ancak. .NET Core 3.0 veya üzerini kullanıyorsanız paket, çerçevenin bir parçası olarak eklenir.
Üretici/tüketici kavramsal programlama modeli
Kanallar, üretici/tüketici kavramsal programlama modelinin bir uygulamasıdır. Bu programlama modelinde üreticiler zaman uyumsuz olarak veri üretir ve tüketiciler bu verileri zaman uyumsuz olarak kullanır. Başka bir deyişle, bu model ilk çıkan ("FIFO") kuyruğu aracılığıyla verileri bir taraftan diğerine geçirir. Kanalları, gibi diğer yaygın genel koleksiyon türlerinde olduğu gibi List<T>
düşünün. Birincil fark, bu koleksiyonun eşitlemeyi yönetmesi ve fabrika oluşturma seçenekleri aracılığıyla çeşitli tüketim modelleri sağlamasıdır. Bu seçenekler, kaç öğe depolamalarına izin verilenler ve bu sınıra ulaşılırsa ne olacağı veya kanala eşzamanlı olarak birden çok üretici veya birden çok tüketici tarafından erişilip erişilemeyeceği gibi kanalların davranışını denetler.
Sınırlayıcı stratejiler
Bir Channel<T>
öğesinin nasıl oluşturulduğuna bağlı olarak, okuyucusu ve yazıcısı farklı davranır.
Maksimum kapasiteyi belirten bir kanal oluşturmak için çağrısında bulunur Channel.CreateBounded. Herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılan bir kanal oluşturmak için öğesini çağırın Channel.CreateUnbounded. Her sınırlayıcı strateji, BoundedChannelOptions sırasıyla veya UnboundedChannelOptions sırasıyla çeşitli oluşturucu tanımlı seçenekleri kullanıma sunar.
Not
Sınırlayıcı stratejiden bağımsız olarak, bir kanal kapatıldıktan sonra kullanıldığında her zaman bir ChannelClosedException oluşturur.
İlişkisiz kanallar
İlişkisiz bir kanal oluşturmak için aşırı yüklemelerden birini çağırın Channel.CreateUnbounded :
var channel = Channel.CreateUnbounded<T>();
İlişkisiz bir kanal oluşturduğunuzda, kanal varsayılan olarak herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılabilir. Alternatif olarak, bir örnek sağlayarak UnboundedChannelOptions
ilişkisiz bir kanal oluştururken beklenmeyen bir davranış belirtebilirsiniz. Kanalın kapasitesi sınırsızdır ve tüm yazma işlemleri zaman uyumlu olarak gerçekleştirilir. Daha fazla örnek için bkz . Sınırsız oluşturma desenleri.
Sınırlanmış kanallar
Sınırlanmış kanal oluşturmak için aşırı yüklemelerden birini çağırın Channel.CreateBounded :
var channel = Channel.CreateBounded<T>(7);
Yukarıdaki kod, maksimum öğe kapasitesine 7
sahip bir kanal oluşturur. Sınırlanmış bir kanal oluşturduğunuzda, kanal maksimum kapasiteye bağlıdır. Sınıra ulaşıldığında, varsayılan davranış kanalın alan kullanılabilir duruma gelene kadar üreticiyi zaman uyumsuz olarak engellemesidir. Kanalı oluştururken bir seçenek belirterek bu davranışı yapılandırabilirsiniz. Sınırlanmış kanallar, sıfırdan büyük herhangi bir kapasite değeriyle oluşturulabilir. Diğer örnekler için bkz . Sınırlanmış oluşturma desenleri.
Tam mod davranışı
Sınırlanmış kanal kullanırken, yapılandırılan sınıra ulaşıldığında kanalın bağlı olduğu davranışı belirtebilirsiniz. Aşağıdaki tabloda her BoundedChannelFullMode değer için tam mod davranışları listeleniyor:
Değer | Davranış |
---|---|
BoundedChannelFullMode.Wait | Bu varsayılan değerdir. WriteAsync Yazma işlemini tamamlamak için kullanılabilir alan beklemeye yönelik çağrılar. Hemen geri dönmek false için TryWrite aramalar. |
BoundedChannelFullMode.DropNewest | Yazılan öğeye yer açmak için kanaldaki en yeni öğeyi kaldırır ve yoksayar. |
BoundedChannelFullMode.DropOldest | Yazılan öğeye yer açmak için kanaldaki en eski öğeyi kaldırır ve yoksayar. |
BoundedChannelFullMode.DropWrite | Yazılan öğeyi bırakır. |
Önemli
a Channel<TWrite,TRead>.Writer , bir'den Channel<TWrite,TRead>.Reader daha hızlı bir şekilde ürettiğinde kanalın yazarı sırt baskısı yaşar.
Üretici API'leri
Üretici işlevselliği üzerinde Channel<TWrite,TRead>.Writerkullanıma sunulur. Üretici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:
API | Beklenen davranış |
---|---|
ChannelWriter<T>.Complete | Kanalı tamamlandı olarak işaretler; başka öğe yazılmaması anlamına gelir. |
ChannelWriter<T>.TryComplete | Kanalı tamamlandı olarak işaretlemeye çalışır; başka veri yazılması gerekmez. |
ChannelWriter<T>.TryWrite | Belirtilen öğeyi kanala yazmaya çalışır. İlişkisiz bir kanalla kullanıldığında, kanalın yazıcısı veya ChannelWriter<T>.TryCompleteile ChannelWriter<T>.Completetamamlanma sinyali vermediği sürece bu her zaman döndürürtrue . |
ChannelWriter<T>.WaitToWriteAsync | ValueTask<TResult> Bir öğe yazmak için kullanılabilir alan olduğunda tamamlanan bir döndürür. |
ChannelWriter<T>.WriteAsync | Kanala zaman uyumsuz olarak bir öğe yazar. |
Tüketici API’leri
Tüketici işlevselliği üzerinde Channel<TWrite,TRead>.Readerkullanıma sunulur. Tüketici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:
API | Beklenen davranış |
---|---|
ChannelReader<T>.ReadAllAsync | Kanaldaki tüm verilerin okunmasını sağlayan bir IAsyncEnumerable<T> oluşturur. |
ChannelReader<T>.ReadAsync | Kanaldan bir öğeyi zaman uyumsuz olarak okur. |
ChannelReader<T>.TryPeek | Kanaldan bir öğeye göz atmaya çalışır. |
ChannelReader<T>.TryRead | Kanaldan bir öğeyi okumaya çalışır. |
ChannelReader<T>.WaitToReadAsync | Veriler okunmaya uygun olduğunda tamamlanan bir ValueTask<TResult> döndürür. |
Yaygın kullanım desenleri
Kanallar için çeşitli kullanım desenleri vardır. API basit, tutarlı ve olabildiğince esnek olacak şekilde tasarlanmıştır. Zaman uyumsuz yöntemlerin tümü, işlem zaman uyumlu ve hatta zaman uyumsuz olarak tamamlanırsa ayırmayı önleyebilecek basit bir zaman uyumsuz işlemi temsil eden bir (veya ValueTask<bool>
) döndürür ValueTask
. Buna ek olarak, BIR kanalın oluşturucusunun hedeflenen kullanımı hakkında sözler verdiği için API, birleştirilebilir olacak şekilde tasarlanmıştır. Belirli parametrelerle bir kanal oluşturulduğunda, iç uygulama bu vaatleri bilerek daha verimli çalışabilir.
Oluşturma desenleri
Küresel bir konum sistemi (GPS) için bir üretici/tüketici çözümü oluşturduğunuzu düşünün. Bir cihazın koordinatlarını zaman içinde izlemek istiyorsunuz. Örnek koordinatlar nesnesi şöyle görünebilir:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
İlişkisiz oluşturma desenleri
Yaygın kullanım desenlerinden biri, varsayılan bir ilişkisiz kanal oluşturmaktır:
var channel = Channel.CreateUnbounded<Coordinates>();
Ancak bunun yerine, birden çok üretici ve tüketiciyle ilişkisiz bir kanal oluşturmak istediğinizi düşünelim:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Bu durumda, tüm yazma işlemleri zaman uyumlu, hatta WriteAsync
. Bunun nedeni, ilişkisiz bir kanalın hemen etkili bir şekilde yazma için her zaman uygun alanı olmasıdır. Ancak, olarak ayarlandığındatrue
, AllowSynchronousContinuations
yazma işlemleri devamlarını yürüterek okuyucuyla ilişkili işler yapabilir. Bu işlem eşitlemesini etkilemez.
Sınırlanmış oluşturma desenleri
Sınırlanmış kanallarda, doğru tüketimin sağlanmasına yardımcı olmak için kanalın yapılandırılabilirliği tüketici tarafından bilinmelidir. Başka bir ifadeyle, yapılandırılan sınıra ulaşıldığında tüketici kanalın hangi davranışı sergilediğini bilmelidir. Şimdi yaygın sınırlanmış oluşturma desenlerinden bazılarını inceleyelim.
Sınırlanmış kanal oluşturmanın en basit yolu bir kapasite belirtmektir:
var channel = Channel.CreateBounded<Coordinates>(1);
Yukarıdaki kod, maksimum kapasiteye 1
sahip sınırlanmış bir kanal oluşturur. Diğer seçenekler kullanılabilir, bazı seçenekler ilişkisiz kanalla aynıdır, diğerleri ise ilişkisiz kanallara özeldir:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
Önceki kodda, kanal 1.000 öğeyle sınırlı bir sınırlanmış kanal olarak oluşturulur ve tek bir yazar ancak çok sayıda okuyucu bulunur. Tam mod davranışı olarak DropWrite
tanımlanır, yani kanal doluysa yazılan öğeyi bırakır.
Sınırlanmış kanallar kullanılırken bırakılan öğeleri gözlemlemek için bir itemDropped
geri çağırma kaydedin:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Kanal dolu olduğunda ve yeni bir öğe eklendiğinde geri itemDropped
arama çağrılır. Bu örnekte, sağlanan geri arama öğeyi konsola yazar, ancak istediğiniz başka bir eylemi gerçekleştirebilirsiniz.
Üretici desenleri
Bu senaryoda yapımcının kanala yeni koordinatlar yazdığını düşünün. Üretici şunu çağırarak TryWritebunu yapabilir:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Yukarıdaki üretici kodu:
- (
ChannelWriter<Coordinates>
) öğesini ilkCoordinates
ile birlikte bağımsız değişken olarak kabul ederChannel<Coordinates>.Writer
. - kullanarak
TryWrite
koordinatları taşımaya çalışan bir koşulluwhile
döngü tanımlar.
Alternatif bir üretici şu WriteAsync
yöntemi kullanabilir:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Tekrar, Channel<Coordinates>.Writer
bir while
döngü içinde kullanılır. Ancak bu kez WriteAsync yöntemi çağrılır. yöntemi ancak koordinatlar yazıldıktan sonra devam eder. Döngüden while
çıkıldığında, kanala Complete daha fazla veri yazıldığını belirten bir çağrısı yapılır.
Başka bir üretici deseni yöntemini kullanmaktır WaitToWriteAsync , aşağıdaki kodu göz önünde bulundurun:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Koşullu while
öğesinin bir parçası olarak, döngüye WaitToWriteAsync
devam edilip edilmeyeceğini belirlemek için çağrının sonucu kullanılır.
Tüketici desenleri
Çeşitli yaygın kanal tüketici desenleri vardır. Bir kanal hiç bitmediğinde, yani süresiz olarak veri ürettiğinde, tüketici bir while (true)
döngü kullanabilir ve kullanılabilir hale geldikçe verileri okuyabilir:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Not
Kanal kapatılırsa bu kod bir özel durum oluşturur.
Alternatif bir tüketici, aşağıdaki kodda gösterildiği gibi iç içe while döngüsü kullanarak bu sorundan kaçınabilir:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
Önceki kodda tüketici verileri okumak için bekler. Veriler kullanılabilir olduğunda, tüketici verileri okumaya çalışır. Bu döngüler, kanalın üreticisi artık okunacak veri olmadığını belirtene kadar değerlendirmeye devam eder. Bununla birlikte, bir üretici ürettiği sonlu sayıda öğeye sahip olduğu biliniyorsa ve tamamlanma sinyali veriyorsa, tüketici öğeler üzerinde yineleme yapmak için semantiği kullanabilir await foreach
:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Yukarıdaki kod, kanalın ReadAllAsync tüm koordinatlarını okumak için yöntemini kullanır.