Cenário de fan-out/fan-in em funções duráveis - Exemplo de backup na nuvem

Fan-out/fan-in refere-se ao padrão de executar várias funções simultaneamente e, em seguida, executar alguma agregação nos resultados. Este artigo explica um exemplo que usa funções duráveis para implementar um cenário de fan-in/fan-out. O exemplo é uma função durável que faz backup de todo ou parte do conteúdo do site de um aplicativo no Armazenamento do Azure.

Nota

A versão 4 do modelo de programação Node.js para o Azure Functions está disponível em geral. O novo modelo v4 foi projetado para ter uma experiência mais flexível e intuitiva para desenvolvedores de JavaScript e TypeScript. Saiba mais sobre as diferenças entre v3 e v4 no guia de migração.

Nos trechos de código a seguir, JavaScript (PM4) indica o modelo de programação V4, a nova experiência.

Pré-requisitos

Descrição geral do cenário

Neste exemplo, as funções carregam todos os arquivos em um diretório especificado recursivamente no armazenamento de blob. Eles também contam o número total de bytes que foram carregados.

É possível escrever uma única função que cuida de tudo. O principal problema que você encontraria é a escalabilidade. Uma única execução de função só pode ser executada em uma única máquina virtual, portanto, a taxa de transferência será limitada pela taxa de transferência dessa única VM. Outro problema é a fiabilidade. Se houver uma falha no meio do caminho, ou se todo o processo demorar mais de 5 minutos, o backup poderá falhar em um estado parcialmente concluído. Teria então de ser reiniciado.

Uma abordagem mais robusta seria escrever duas funções regulares: uma enumeraria os arquivos e adicionaria os nomes de arquivo a uma fila e outra leria a fila e carregaria os arquivos para o armazenamento de blobs. Essa abordagem é melhor em termos de taxa de transferência e confiabilidade, mas exige que você provisione e gerencie uma fila. Mais importante ainda, uma complexidade significativa é introduzida em termos de gestão e coordenação do estado se você quiser fazer algo mais, como relatar o número total de bytes carregados.

Uma abordagem de funções duráveis oferece todos os benefícios mencionados com despesas gerais muito baixas.

As funções

Este artigo explica as seguintes funções no aplicativo de exemplo:

  • E2_BackupSiteContent: Uma função orchestrator que chama E2_GetFileList para obter uma lista de arquivos para backup e, em seguida, chama E2_CopyFileToBlob para fazer backup de cada arquivo.
  • E2_GetFileList: Uma função de atividade que retorna uma lista de arquivos em um diretório.
  • E2_CopyFileToBlob: Uma função de atividade que faz backup de um único arquivo no Armazenamento de Blobs do Azure.

E2_BackupSiteContent função orquestradora

Esta função de orquestrador faz essencialmente o seguinte:

  1. Usa um rootDirectory valor como um parâmetro de entrada.
  2. Chama uma função para obter uma lista recursiva de arquivos em rootDirectory.
  3. Faz várias chamadas de função paralelas para carregar cada arquivo no Armazenamento de Blobs do Azure.
  4. Aguarda a conclusão de todos os carregamentos.
  5. Retorna a soma total de bytes que foram carregados no Armazenamento de Blob do Azure.

Aqui está o código que implementa a função orchestrator:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Observe a await Task.WhenAll(tasks); linha. Todas as chamadas individuais para a função não foram esperadas, o E2_CopyFileToBlob que lhes permite correr em paralelo. Quando passamos essa matriz de tarefas para Task.WhenAllo , recebemos de volta uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se você estiver familiarizado com a TPL (Task Parallel Library) no .NET, isso não é novo para você. A diferença é que essas tarefas podem ser executadas em várias máquinas virtuais simultaneamente, e a extensão Durable Functions garante que a execução de ponta a ponta seja resiliente à reciclagem de processos.

Depois de aguardar do , sabemos que todas as chamadas de Task.WhenAllfunção foram concluídas e retornaram os valores de volta para nós. Cada chamada para E2_CopyFileToBlob retorna o número de bytes carregados, portanto, calcular a contagem total de bytes da soma é uma questão de adicionar todos esses valores de retorno juntos.

Funções de atividade auxiliar

As funções de atividade auxiliar, como com outros exemplos, são apenas funções regulares que usam a ligação de activityTrigger gatilho.

E2_GetFileList função de atividade

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Nota

Você pode estar se perguntando por que você não poderia simplesmente colocar esse código diretamente na função de orquestrador. Você poderia, mas isso quebraria uma das regras fundamentais das funções do orquestrador, que é que eles nunca devem fazer E/S, incluindo acesso ao sistema de arquivos local. Para obter mais informações, consulte Restrições de código de função do Orchestrator.

E2_CopyFileToBlob função de atividade

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Nota

Você precisará instalar o Microsoft.Azure.WebJobs.Extensions.Storage pacote NuGet para executar o código de exemplo.

A função usa alguns recursos avançados das associações do Azure Functions (ou seja, o uso do Binder parâmetro), mas você não precisa se preocupar com esses detalhes para o propósito deste passo a passo.

A implementação carrega o arquivo do disco e transmite o conteúdo de forma assíncrona em um blob de mesmo nome no contêiner "backups". O valor de retorno é o número de bytes copiados para o armazenamento, que é usado pela função orchestrator para calcular a soma agregada.

Nota

Este é um exemplo perfeito de como mover operações de E/S para uma activityTrigger função. Não só o trabalho pode ser distribuído por muitas máquinas diferentes, mas você também obtém os benefícios de verificar o progresso. Se o processo do anfitrião for encerrado por qualquer motivo, sabe quais os carregamentos que já foram concluídos.

Executar o exemplo

Você pode iniciar a orquestração, no Windows, enviando a seguinte solicitação HTTP POST.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Como alternativa, em um aplicativo de função Linux (Python atualmente só é executado no Linux for App Service), você pode iniciar a orquestração assim:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Nota

A HttpStart função que você está invocando só funciona com conteúdo formatado em JSON. Por esse motivo, o Content-Type: application/json cabeçalho é necessário e o caminho do diretório é codificado como uma cadeia de caracteres JSON. Além disso, o trecho HTTP pressupõe que há uma entrada no arquivo que remove o prefixo padrão api/ de todas as URLs de funções de gatilho host.json HTTP. Você pode encontrar a marcação para essa configuração no host.json arquivo nos exemplos.

Essa solicitação HTTP aciona o E2_BackupSiteContent orquestrador e passa a string D:\home\LogFiles como um parâmetro. A resposta fornece um link para obter o status da operação de backup:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Dependendo de quantos arquivos de log você tem em seu aplicativo de função, essa operação pode levar vários minutos para ser concluída. Você pode obter o status mais recente consultando a Location URL no cabeçalho da resposta HTTP 202 anterior.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

Neste caso, a função ainda está em execução. Você pode ver a entrada que foi salva no estado do orquestrador e a última hora atualizada. Você pode continuar a usar os valores de Location cabeçalho para sondar para conclusão. Quando o status é "Concluído", você vê um valor de resposta HTTP semelhante ao seguinte:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Agora você pode ver que a orquestração está completa e aproximadamente quanto tempo levou para ser concluída. Você também vê um valor para o output campo, que indica que cerca de 450 KB de logs foram carregados.

Próximos passos

Este exemplo mostrou como implementar o padrão fan-out/fan-in. O próximo exemplo mostra como implementar o padrão de monitor usando temporizadores duráveis.