Tutorial: Generación y uso de secuencias asincrónicas con C# y .NET

Las secuencias asincrónicas modelan un origen de datos de streaming. Las secuencias de datos suelen recuperar o generar elementos de forma asincrónica. Proporcionan un modelo de programación natural para los orígenes de datos de streaming asincrónicos.

En este tutorial aprenderá lo siguiente:

  • Crear un origen de datos que genera una secuencia de elementos de datos de forma asincrónica.
  • Utilizar ese origen de datos de forma asincrónica.
  • Admitir la cancelación y los contextos capturados para secuencias asincrónicas.
  • Reconocer cuándo la interfaz y el origen de datos nuevos son preferibles a las secuencias de datos sincrónicas anteriores.

Requisitos previos

Deberá configurar el equipo para que ejecute .NET, incluido el compilador de C#. El compilador de C# está disponible con Visual Studio 2022 o el SDK de .NET.

Deberá crear un token de acceso de GitHub para poder tener acceso al punto de conexión de GraphQL de GitHub. Seleccione los siguientes permisos para el token de acceso de GitHub:

  • repo:status
  • public_repo

Guarde el token de acceso en un lugar seguro para usarlo a fin de obtener acceso al punto de conexión de API de GitHub.

Advertencia

Mantenga seguro su token de acceso personal. Cualquier software con su token de acceso personal podría realizar llamadas de API de GitHub con sus derechos de acceso.

En este tutorial se da por supuesto que conoce bien C# y. NET, incluidos Visual Studio o la CLI de .NET.

Ejecución de la aplicación de inicio

Puede obtener el código para la aplicación de inicio usada en este tutorial en el repositorio dotnet/docs de la carpeta asynchronous-programming/snippets.

La aplicación de inicio es una aplicación de consola que usa la interfaz GraphQL de GitHub para recuperar las incidencias recientes escritas en el repositorio dotnet/docs. Comience por mirar el código siguiente para el método Main de la aplicación de inicio:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Puede establecer una variable de entorno GitHubKey para el token de acceso personal, o bien puede reemplazar el último argumento en la llamada a GetEnvVariable por el token de acceso personal. No coloque el código de acceso en el código fuente si va a compartir el origen con otros usuarios. No cargue nunca códigos de acceso en un repositorio de código fuente compartido.

Después de crear el cliente de GitHub, el código de Main crea un objeto de informe de progreso y un token de cancelación. Una vez que se crean esos objetos, Main llama a RunPagedQueryAsync para recuperar las 250 incidencias creadas más recientemente. Una vez finalizada esa tarea, se muestran los resultados.

Al ejecutar la aplicación inicial, puede realizar algunas observaciones importantes acerca de cómo se ejecuta esta aplicación. Verá el progreso notificado para cada página devuelta desde GitHub. Puede observar una pausa marcada antes de que GitHub devuelva cada nueva página de incidencias. Por último, se muestran las incidencias solo después de que se hayan recuperado 10 páginas de GitHub.

Examen de la implementación

La implementación revela por qué observó el comportamiento descrito en la sección anterior. Examine el código de RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Lo primero que hace este método es crear el objeto POST mediante la clase GraphQLRequest:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

que ayuda a formar el cuerpo del objeto POST y convertirlo correctamente en JSON presentado como una sola cadena con el método ToJsonText, que quita todos los caracteres de nueva línea del cuerpo de la solicitud que los marcan con el carácter de escape \ (barra diagonal inversa).

Vamos a concentrarnos en el algoritmo de paginación y la estructura asincrónica del código anterior. (Puede consultar la documentación de GraphQL de GitHub para obtener más información sobre la API de GraphQL de GitHub). El método RunPagedQueryAsync enumera las incidencias desde la más reciente hasta la más antigua. Solicita 25 incidencias por página y examina la estructura pageInfo de la respuesta para continuar con la página anterior. Eso sigue al soporte de paginación estándar de GraphQL para respuestas de varias páginas. La respuesta incluye un objeto pageInfo que incluye a su vez un valor hasPreviousPages y un valor startCursor usado para solicitar la página anterior. Las incidencias se encuentran en la matriz nodes. El método RunPagedQueryAsync anexa estos nodos a una matriz que contiene todos los resultados de todas las páginas.

Después de recuperar y restaurar una página de resultados, RunPagedQueryAsync informa del progreso y comprueba la cancelación. Si se ha solicitado la cancelación, RunPagedQueryAsync lanza un OperationCanceledException.

Hay varios elementos en este código que se pueden mejorar. Lo más importante, RunPagedQueryAsync debe asignar el almacenamiento para todas las incidencias devueltas. Este ejemplo se detiene en 250 incidencias porque la recuperación de todas las incidencias abiertas requeriría mucha más memoria para almacenar todas las incidencias recuperadas. Los protocolos para admitir los informes de progreso y la cancelación hacen que el algoritmo sea más difícil de comprender en su primera lectura. Hay más tipos y API implicados. Debe realizar un seguimiento de las comunicaciones a través de CancellationTokenSource y su CancellationToken asociado para comprender dónde se solicita la cancelación y dónde se concede.

Las secuencias asincrónicas ofrecen una manera mejor

Las secuencias asincrónicas y el lenguaje asociado abordan todas estas cuestiones. El código que genera la secuencia ahora puede usar yield return para devolver los elementos en un método que se declaró con el modificador async. Puede usar una secuencia asincrónica utilizando un bucle await foreach igual que puede usar una secuencia mediante un bucle foreach.

Estas nuevas características del lenguaje dependen de tres nuevas interfaces agregadas a .NET Standard 2.1 e implementadas en .NET Core 3.0:

Estas tres interfaces deben resultar familiares a la mayoría de desarrolladores de C#. Se comportan de manera similar con sus contrapartes sincrónicas:

Un tipo que podría ser desconocido es System.Threading.Tasks.ValueTask. La estructura ValueTask proporciona una API similar a la clase System.Threading.Tasks.Task. ValueTask se usa en estas interfaces por motivos de rendimiento.

Conversión en secuencias asincrónicas

A continuación, convierta el método RunPagedQueryAsync para generar una secuencia asincrónica. En primer lugar, cambie la signatura de RunPagedQueryAsync para que devuelva un IAsyncEnumerable<JToken>y quite el token de cancelación y los objetos de progreso de la lista de parámetros como se muestra en el código siguiente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

El código de inicio procesa cada página a medida que se recupera, tal como se muestra en el código siguiente:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Reemplace esas tres líneas por el código siguiente:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

También puede quitar la declaración de finalResults anteriormente en este método y la instrucción return que sigue al bucle modificado.

Ha terminado los cambios para generar una secuencia asincrónica. El método finalizado debería ser similar al código siguiente:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A continuación, cambie el código que utiliza la colección para usar la secuencia asincrónica. Busque el código siguiente en Main que procesa la colección de incidencias:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Reemplace el código por el siguiente bucle await foreach:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

La nueva interfaz IAsyncEnumerator<T> deriva de IAsyncDisposable. Esto significa que el bucle anterior desechará la secuencia de forma asincrónica cuando finalice el bucle. Como imaginará, el bucle es similar al código siguiente:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Los elementos de secuencia se procesan de forma predeterminada en el contexto capturado. Si quiere deshabilitar la captura del contexto, use el método de extensión TaskAsyncEnumerableExtensions.ConfigureAwait. Para obtener más información sobre los contextos de sincronización y la captura del contexto actual, vea el artículo sobre el consumo del patrón asincrónico basado en tareas.

Las secuencias asincrónicas admiten la cancelación mediante el mismo protocolo que otros métodos async. Para admitir la cancelación, debe modificar la firma del método de iterador asincrónico como se indica a continuación:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

El atributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute hace que el compilador genere código para IAsyncEnumerator<T>, que hace que el token que se pasa a GetAsyncEnumerator sea visible al cuerpo del iterador asincrónico como ese argumento. En runQueryAsync, puede examinar el estado del token y cancelar el trabajo posterior si es necesario.

Se puede usar otro método de extensión, WithCancellation, para pasar el token de cancelación a la secuencia asincrónica. Modifique el bucle que enumera los problemas de la siguiente manera:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Puede obtener el código para el tutorial finalizado en el repositorio dotnet/docs de la carpeta asynchronous-programming/snippets.

Ejecución de la aplicación finalizada

Vuelva a ejecutar la aplicación. Compare su comportamiento con el comportamiento de la aplicación de inicio. La primera página de resultados se enumera en cuanto está disponible. Hay una pausa marcada cada vez que se solicita y se recupera una página nueva; a continuación, se enumeran rápidamente los resultados de la página siguiente. El bloque try / catch no es necesario para controlar la cancelación: el autor de la llamada puede detener la enumeración de la colección. El progreso se notifica claramente porque la secuencia asincrónica genera resultados a medida que se descarga cada página. El estado de cada problema devuelto se incluye sin problemas en el bucle await foreach. No necesita un objeto de devolución de llamada para hacer un seguimiento del progreso.

Puede ver mejoras en el uso de la memoria examinando el código. Ya no tiene que asignar una colección para almacenar todos los resultados antes de que se enumeren. El autor de la llamada puede determinar cómo consumir los resultados y si se necesita una colección de almacenamiento.

Ejecute las aplicaciones de inicio y finalizada y podrá ver las diferencias entre las implementaciones personalmente. Puede eliminar el token de acceso de GitHub que creó cuando inició este tutorial cuando haya terminado. Si un atacante obtuviera acceso a dicho token, podría tener acceso a sus API de GitHub con sus credenciales.

En este tutorial, ha usado flujos asincrónicos para leer elementos individuales de una API de red que devuelve páginas de datos. Los flujos asincrónicos también pueden leer "flujos que nunca terminan", como un teletipo de bolsa o un sensor. La llamada a MoveNextAsync devuelve el siguiente elemento en cuanto está disponible.