Libreria System.Threading.Channels

Lo spazio dei nomi System.Threading.Channels fornisce un set di struttura di dati di sincronizzazione per passare i dati tra produttori e consumatori in modo asincrono. La libreria è destinata a .NET Standard e funziona su tutte le implementazioni .NET.

Questa libreria è disponibile nel pacchetto NuGet System.Threading.Channels. Tuttavia, se si usa .NET Core 3.0 o versione successiva, il pacchetto viene incluso come parte del framework.

Modello di programmazione concettuale produttore/consumatore

I canali sono un'implementazione del modello di programmazione concettuale produttore/consumatore. In questo modello di programmazione i produttori producono i dati in modo asincrono e i consumatori li utilizzano in modo asincrono. In altre parole, questo modello passa i dati da una parte a un'altra tramite una coda FIFO ("First-In First-Out"). Provare a considerare i canali come qualsiasi altro tipo di raccolta generico comune, ad esempio List<T>. La differenza principale consiste nel fatto che questa raccolta gestisce la sincronizzazione e fornisce vari modelli di consumo tramite le opzioni di creazione della factory. Tali opzioni controllano il comportamento dei canali, ad esempio quanti elementi sono autorizzati ad archiviare e cosa accade se viene raggiunto tale limite oppure se il canale è accessibile da più produttori o più consumatori contemporaneamente.

Strategie di delimitazione

A seconda della modalità di creazione di un oggetto Channel<T>, il lettore e chi scrive (writer) si comportano in modo diverso.

Per creare un canale che specifica una capacità massima, chiamare Channel.CreateBounded. Per creare un canale utilizzato contemporaneamente da un numero qualsiasi di lettori e writer, chiamare Channel.CreateUnbounded. Ogni strategia di delimitazione espone varie opzioni definite dall'autore, rispettivamente BoundedChannelOptions o UnboundedChannelOptions.

Nota

Indipendentemente dalla strategia di delimitazione, un canale genererà sempre un'eccezione ChannelClosedException quando viene usato dopo la chiusura.

Canali non delimitati

Per creare un canale non delimitato, chiamare uno degli overload Channel.CreateUnbounded:

var channel = Channel.CreateUnbounded<T>();

Quando si crea un canale non associato, per impostazione predefinita, il canale può essere usato contemporaneamente da un numero qualsiasi di lettori e writer. In alternativa, è possibile specificare un comportamento non predefinito durante la creazione di un canale non delimitato fornendo un'istanza UnboundedChannelOptions. La capacità del canale non è delimitata e tutte le scritture vengono eseguite in modo sincrono. Per altri esempi, vedere Modelli di creazione senza delimitazioni.

Canali delimitati

Per creare un canale delimitato, chiamare uno degli overload Channel.CreateBounded:

var channel = Channel.CreateBounded<T>(7);

Il codice precedente crea un canale con capacità massima di 7 elementi. Quando si crea un canale delimitato, il canale viene associato a una capacità massima. Quando viene raggiunto tale limite, il comportamento predefinito è che il canale blocchi in modo asincrono il produttore fino a quando sia disponibile dello spazio. È possibile configurare questo comportamento specificando un'opzione al momento della creazione del canale. I canali delimitati possono essere creati con qualsiasi valore di capacità maggiore di zero. Per altri esempi, vedere Modelli di creazione delimitati.

Comportamento in modalità completa

Quando si usa un canale delimitato, è possibile specificare il comportamento a cui il canale deve conformarsi quando viene raggiunto il limite configurato. La tabella seguente elenca i comportamenti in modalità completa per ogni valore BoundedChannelFullMode:

Valore Comportamento
BoundedChannelFullMode.Wait Questo è il valore predefinito. Richiama per far sì che WriteAsync attenda che ci sia spazio disponibile per completare l'operazione di scrittura. Chiama perché TryWrite restituisca immediatamente false.
BoundedChannelFullMode.DropNewest Rimuove e ignora l'elemento più recente nel canale per liberare spazio per l'elemento in corso di scrittura.
BoundedChannelFullMode.DropOldest Rimuove e ignora l'elemento meno recente nel canale per liberare spazio per l'elemento in corso di scrittura.
BoundedChannelFullMode.DropWrite Trascina l'elemento in corso di scrittura.

Importante

Ogni volta che un Channel<TWrite,TRead>.Writer produce più velocemente di quanto un Channel<TWrite,TRead>.Reader è in grado di consumare, il writer del canale sperimenta una contropressione.

API producer

La funzionalità producer viene esposta nel Channel<TWrite,TRead>.Writer. Le API producer e il comportamento previsto sono descritti in dettaglio nella tabella seguente:

API Comportamento previsto
ChannelWriter<T>.Complete Contrassegna il canale come completo, per indicare che non vengono scritti ulteriori elementi.
ChannelWriter<T>.TryComplete Tenta di contrassegnare il canale come completato, per indicare che non vengono scritti altri dati.
ChannelWriter<T>.TryWrite Tenta di scrivere l'elemento specificato nel canale. Se usato con un canale non delimitato, questo restituisce sempre true a meno che il writer del canale non segnali il completamento con ChannelWriter<T>.Complete o ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Restituisce un ValueTask<TResult> che viene completato quando è disponibile lo spazio per scrivere un elemento.
ChannelWriter<T>.WriteAsync Scrive in modo asincrono un elemento nel canale.

API utente

La funzionalità consumer viene esposta nel Channel<TWrite,TRead>.Reader. Le API consumer e il comportamento previsto sono descritti in dettaglio nella tabella seguente:

API Comportamento previsto
ChannelReader<T>.ReadAllAsync Crea un elemento IAsyncEnumerable<T> che consente la lettura di tutti i dati dal canale.
ChannelReader<T>.ReadAsync Legge in modo asincrono un elemento dal canale.
ChannelReader<T>.TryPeek Tenta di visualizzare un elemento dal canale.
ChannelReader<T>.TryRead Tenta di leggere un elemento dal canale.
ChannelReader<T>.WaitToReadAsync Restituisce un ValueTask<TResult> che viene completato quando sono disponibili i dati per la lettura.

Modelli di utilizzo comuni

Esistono diversi criteri di utilizzo per i canali. L'API è progettata per essere semplice, coerente e il più flessibile possibile. Tutti i metodi asincroni restituiscono un oggetto ValueTask (o ValueTask<bool>) che rappresenta un'operazione asincrona leggera, in grado di evitare l’allocazione se l'operazione viene completata in modo sincrono e, potenzialmente, perfino in modo asincrono. Inoltre, l'API è progettata per essere componibile, in quanto l'autore di un canale ne promette l'utilizzo previsto. Quando un canale viene creato con determinati parametri, l'implementazione interna può operare in modo più efficiente conoscendo tali promesse.

Criteri di creazione

Si supponga di creare una soluzione produttore/consumatore per un sistema di posizione globale (GPS). Si desidera tenere traccia delle coordinate di un dispositivo nel tempo. Un oggetto campione delle coordinate potrebbe essere simile al seguente:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Criteri di creazione senza limitazioni

Un criterio di utilizzo comune consiste nel creare un canale non delimitato predefinito:

var channel = Channel.CreateUnbounded<Coordinates>();

Si supponga invece di voler creare un canale non delimitato con più produttori e consumatori:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

In questo caso, tutte le scritture sono sincrone, perfino WriteAsync. Ciò è dovuto al fatto che un canale non delimitato ha sempre spazio disponibile per una scrittura efficace in modo immediato. Tuttavia, con AllowSynchronousContinuations impostato su true, le scritture potrebbero finire per eseguire operazioni associate a un lettore eseguendo le relative continuazioni. Ciò non influisce sulla sincronia dell'operazione.

Criteri di creazione delimitati

Con i canali delimitati, la configurabilità del canale deve essere nota al consumatore al fine di garantirne un consumo appropriato. Ovvero, il consumatore deve conoscere il comportamento visualizzato dal canale quando viene raggiunto il limite configurato. Verranno ora esaminati alcuni dei criteri di creazione delimitati comuni.

Il modo più semplice per creare un canale delimitato consiste nel specificare una capacità:

var channel = Channel.CreateBounded<Coordinates>(1);

Il codice precedente crea un canale delimitato con capacità massima di 1. Sono disponibili altre opzioni; alcune opzioni sono uguali a un canale non delimitato, mentre altre sono specifiche per i canali non delimitati:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Nel codice precedente il canale viene creato come canale delimitato con un limite di 1.000 elementi, con un singolo writer ma molti lettori. Il comportamento della modalità completa viene definito come DropWrite, il che significa che elimina l'elemento scritto se il canale è pieno.

Per osservare gli elementi eliminati quando si usano canali delimitati, registrare un callback itemDropped:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Ogni volta che il canale è pieno e viene aggiunto un nuovo elemento, viene richiamato il callback itemDropped. In questo esempio, il callback fornito scrive l'elemento nella console, ma è possibile eseguire qualsiasi altra azione desiderata.

Criteri produttori

Si supponga che il produttore in questo scenario stia scrivendo nuove coordinate all’interno del canale. Il produttore può eseguire questa operazione chiamando TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Il codice produttore precedente:

  • Accetta il Channel<Coordinates>.Writer( ChannelWriter<Coordinates>) come argomento, insieme all'oggetto iniziale Coordinates.
  • Definisce un ciclo condizionale while che tenta di spostare le coordinate usando TryWrite.

Un produttore alternativo potrebbe usare il metodo WriteAsync:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Anche in questo caso, Channel<Coordinates>.Writer viene usato all'interno di un ciclo while. Ma questa volta viene chiamato metodo WriteAsync. Il metodo continuerà solo dopo la scrittura delle coordinate. Quando il ciclo while si chiude, viene effettuata una chiamata a Complete, a segnalare che nel canale non vengono scritti altri dati.

Un altro modello produttore consiste nell'usare il metodoWaitToWriteAsync, considerare il codice seguente:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Come parte dell'istruzione condizionale while, il risultato della chiamata WaitToWriteAsync viene usato per stabilire se continuare il ciclo.

Criteri consumatore

Esistono diversi criteri di canali consumer comuni. Quando un canale non termina mai, ovvero produce dati per un periodo illimitato, il consumatore potrebbe usare un ciclo while (true) e leggere i dati man mano che diventano disponibili:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Nota

Questo codice genererà un'eccezione se il canale è chiuso.

Un consumatore alternativo potrebbe evitare questo problema usando un ciclo while annidato, come illustrato nel codice seguente:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Nel codice precedente il consumatore attende di leggere i dati. Quando i dati sono disponibili, il consumatore prova a leggerlo. Tali cicli continuano a effettuare valutazioni fino a quando il produttore del canale segnala che non ha più dati da leggere. Detto questo, quando un produttore è noto per avere un numero finito di elementi che produce e ne segnala il completamento, il consumatore può usare la semantica await foreach per scorrere gli elementi:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Il codice precedente usa il metodo ReadAllAsyncper leggere tutte le coordinate dal canale.

Vedi anche