Použití streamování v ASP.NET Core SignalR

Autor: Brennan Conroy

ASP.NET Core SignalR podporuje streamování z klienta na server a ze serveru do klienta. To je užitečné ve scénářích, kdy v průběhu času přicházejí fragmenty dat. Při streamování se každý fragment odešle klientovi nebo serveru, jakmile bude k dispozici, a nečeká na zpřístupnění všech dat.

Zobrazení nebo stažení ukázkového kódu (postup stažení)

Nastavení centra pro streamování

Metoda centra se automaticky stane metodou centra streamování při vrácení IAsyncEnumerable<T>, ChannelReader<T>, , Task<IAsyncEnumerable<T>>nebo Task<ChannelReader<T>>.

Streamování mezi servery

Metody centra streamování se můžou kromě IAsyncEnumerable<T> ChannelReader<T>. Nejjednodušší způsob, jak se vrátit IAsyncEnumerable<T> , je vytvořit metodu async iterátoru hubu, jak ukazuje následující ukázka. Metody asynchronního iterátoru centra můžou přijmout CancellationToken parametr, který se aktivuje, když se klient odhlásí ze streamu. Asynchronní metody iterátoru se vyhýbají problémům běžným s kanály, například nevrací ChannelReader dostatečně brzo nebo ukončete metodu bez dokončení ChannelWriter<T>.

Poznámka:

Následující ukázka vyžaduje C# 8.0 nebo novější.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Následující ukázka ukazuje základy streamování dat do klienta pomocí kanálů. Při každém zápisu objektu do objektu ChannelWriter<T>se objekt okamžitě odešle klientovi. Na konci se dokončí a sdělí klientovi, ChannelWriter že je datový proud zavřený.

Poznámka:

Napište na ChannelWriter<T> vlákno na pozadí a vraťte ChannelReader co nejdříve. Další vyvolání centra se zablokují, dokud ChannelReader se nevrátí.

Zalomení logiky try ... catch v příkazu Channelfinally Dokončete blok. Pokud chcete tok chyby provést, zachyťte ho catch uvnitř bloku a zapište ho finally do bloku.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Metody centra streamování mezi servery můžou přijímat CancellationToken parametr, který se aktivuje, když se klient odhlásí ze streamu. Tento token použijte k zastavení operace serveru a uvolnění všech prostředků, pokud se klient odpojí před koncem datového proudu.

Streamování mezi klienty a serverem

Metoda centra se automaticky stane metodou centra streamování typu klient-server, když přijme jeden nebo více objektů typu ChannelReader<T> nebo IAsyncEnumerable<T>. Následující ukázka ukazuje základy čtení streamovaných dat odesílaných z klienta. Kdykoli klient zapíše do objektu ChannelWriter<T>, data se zapíšou na ChannelReader serveru, ze kterého se čte metoda centra.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Verze IAsyncEnumerable<T> metody následuje.

Poznámka:

Následující ukázka vyžaduje C# 8.0 nebo novější.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Klient .NET

Streamování mezi servery

Metody StreamAsync a StreamAsChannelAsync metody HubConnection se používají k vyvolání metod streamování mezi servery. Předejte název metody centra a argumenty definované v metodě centra do StreamAsync nebo StreamAsChannelAsync. Obecný parametr zapnutý StreamAsync<T> a StreamAsChannelAsync<T> určuje typ objektů vrácených metodou streamování. Objekt typu IAsyncEnumerable<T> nebo ChannelReader<T> je vrácen z vyvolání datového proudu a představuje datový proud v klientovi.

Příklad StreamAsync , který vrátí IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Odpovídající StreamAsChannelAsync příklad, který vrátí ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

V předchozím kódu:

  • Metoda StreamAsChannelAsync on HubConnection se používá k vyvolání metody streamování mezi servery. Předejte název metody centra a argumenty definované v metodě centra do StreamAsChannelAsync.
  • Obecný parametr pro StreamAsChannelAsync<T> určuje typ objektů vrácených metodou streamování.
  • Vrátí ChannelReader<T> se z vyvolání datového proudu a představuje datový proud v klientovi.

Streamování mezi klienty a serverem

Existují dva způsoby, jak vyvolat metodu centra streamování typu klient-server z klienta .NET. V závislosti na vyvolané metodě centra můžete předat buď argument, nebo ChannelReader jako argument InvokeAsyncSendAsync, nebo StreamAsChannelAsync.IAsyncEnumerable<T>

Při každém zápisu dat do objektu IAsyncEnumerable nebo ChannelWriter objektu obdrží metoda centra na serveru novou položku s daty z klienta.

Pokud používáte IAsyncEnumerable objekt, datový proud končí po ukončení metody vracející položky datového proudu.

Poznámka:

Následující ukázka vyžaduje C# 8.0 nebo novější.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Nebo pokud používáte ChannelWriter, dokončíte kanál pomocí channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Javascriptový klient

Streamování mezi servery

Klienti JavaScriptu volají metody streamování mezi servery v centrech s connection.stream. Metoda stream přijímá dva argumenty:

  • Název metody centra. V následujícím příkladu je Counternázev metody centra .
  • Argumenty definované v metodě centra. V následujícím příkladu jsou argumenty počtem položek datového proudu, které se mají přijmout, a zpoždění mezi položkami datového proudu.

connection.stream vrátí hodnotu IStreamResult, která obsahuje metodu subscribe . IStreamSubscriber subscribe Předat a nastavit next, errora complete zpětná volání přijímat oznámení z stream vyvolání.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Chcete-li ukončit stream z klienta, zavolejte dispose metodusubscribe, ISubscription která je vrácena z metody. Volání této metody způsobí zrušení CancellationToken parametru hub metody, pokud jste zadali jednu.

Streamování mezi klienty a serverem

Klienti JavaScriptu volají metody streamování mezi klienty na server v centrech předáním argumentu Subject , sendinvokenebo stream, v závislosti na vyvolané metodě centra. Jedná Subject se o třídu, která vypadá jako Subject. V RxJS můžete například použít třídu Předmět z této knihovny.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Volání subject.next(item) s položkou zapíše položku do datového proudu a metoda centra přijme položku na serveru.

Chcete-li ukončit stream, zavolejte subject.complete().

Java klient

Streamování mezi servery

Klient SignalR Java používá metodu stream k vyvolání metod streamování. stream přijímá tři nebo více argumentů:

  • Očekávaný typ položek datového proudu.
  • Název metody centra.
  • Argumenty definované v metodě centra.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Metoda stream pro HubConnection vrátí Observable typu položky datového proudu. Metoda observable typu subscribe je kde onNextonError a onCompleted obslužné rutiny jsou definovány.

Streamování mezi klienty a serverem

Klient SignalR Java může volat metody streamování typu klient-server v centrech předáním pozorovatelného jako argumentu sendinvoke, nebo stream, v závislosti na vyvolané metodě centra.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Volání stream.onNext(item) s položkou zapíše položku do datového proudu a metoda centra přijme položku na serveru.

Chcete-li ukončit stream, zavolejte stream.onComplete().

Další materiály