Passo a passo: criar um pipeline de fluxo de dados

Embora possa usar os métodos DataflowBlock.Receive, DataflowBlock.ReceiveAsync e DataflowBlock.TryReceive para receber mensagens dos blocos de origem, você também pode conectar blocos de mensagens para formar um pipeline de fluxo de dados. Um pipeline de fluxo de dados é uma série de componentes, ou blocos de fluxo de dados, e cada uma série executa uma tarefa específica que contribui para um objetivo maior. Todos os blocos de fluxo de dados em um pipeline de fluxo de dados realizam trabalhos ao receber uma mensagem de outro bloco de fluxo de dados. Como analogia, podemos usar uma linha de montagem de automóveis. À medida que os veículos passam por ela, uma estação monta a carroceria, a seguinte instala o motor e assim por diante. Como a linha de montagem permite que vários veículos sejam montados ao mesmo tempo, seu desempenho é superior se comparado com a montagem de um veículo completo por vez.

Este documento demonstra um pipeline de fluxo de dados que baixa o catálogo A Ilíada de Homero de um site e pesquisa o texto para fazer a correspondência de palavras individuais com palavras que invertem os primeiros caracteres da palavra. A formação do pipeline de fluxo de dados neste documento conta com as seguintes etapas:

  1. Crie os blocos de fluxo de dados que participam do pipeline.

  2. Conecte cada bloco de fluxo de dados ao próximo bloco do pipeline. Cada bloco recebe como entrada a saída do bloco anterior no pipeline.

  3. Para cada bloco de fluxo de dados, crie uma tarefa de continuação que definirá o estado do próximo bloco como concluído após o término do bloco anterior.

  4. Publique dados no cabeçalho do pipeline.

  5. Marque o cabeçalho do pipeline como concluído.

  6. Aguarde até que o pipeline conclua todo o trabalho.

Pré-requisitos

Leia sobre o Fluxo de dados antes de iniciar essa explicação passo a passo.

Criar um Aplicativo de console

No Visual Studio, crie um projeto de aplicativo de console do Visual Basic ou do Visual C#. Instale o pacote NuGet de System.Threading.Tasks.Dataflow.

Observação

A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o projeto, escolha Gerenciar Pacotes NuGet no menu Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow.

Adicione o seguinte código ao projeto para criar o aplicativo básico.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Criar os blocos de fluxo de dados

Adicione o seguinte código ao método Main para criar os blocos de fluxo de dados que participam do pipeline. A tabela a seguir resume a função de cada membro do pipeline.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Membro Type Descrição
downloadString TransformBlock<TInput,TOutput> Baixa o texto do catálogo na Web.
createWordList TransformBlock<TInput,TOutput> Separa o texto do catálogo em uma matriz de palavras.
filterWordList TransformBlock<TInput,TOutput> Remove palavras curtas e duplicadas da matriz de palavras.
findReversedWords TransformManyBlock<TInput,TOutput> Localiza todas as palavras na coleção de matrizes de palavras filtradas cujo inverso também ocorre na matriz de palavras.
printReversedWords ActionBlock<TInput> Exibe palavras e o inverso correspondente para o console.

Embora você possa combinar várias etapas no pipeline de fluxo de dados deste exemplo em uma única etapa, o exemplo ilustra o conceito de composição de várias tarefas independentes do fluxo de dados para executar uma tarefa maior. O exemplo usa TransformBlock<TInput,TOutput> para permitir que cada membro do pipeline execute uma operação em seus dados de entrada e envie os resultados para a próxima etapa do pipeline. O membro findReversedWords do pipeline é um objeto TransformManyBlock<TInput,TOutput> porque produz várias saídas independentes para cada entrada. A parte final do pipeline, printReversedWords, é um objeto ActionBlock<TInput> porque executa uma ação na entrada, mas não produz um resultado.

Formação do pipeline

Adicione o código a seguir para conectar cada bloco ao próximo bloco do pipeline.

Quando você chama o método LinkTo para conectar um bloco de fluxo de dados de origem a um bloco de fluxo de dados de destino, o bloco de fluxo de dados de origem propaga dados para o bloco de destino assim que os dados ficam disponíveis. Se você também fornecer DataflowLinkOptions com PropagateCompletion definido como "true", a conclusão bem-sucedida ou não de um bloco do pipeline levará à conclusão do próximo bloco do pipeline.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Publicar dados no pipeline

Adicione o seguinte código para publicar a URL do catálogo A Ilíada de Homero no cabeçalho do pipeline de fluxo de dados.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Este exemplo usa DataflowBlock.Post para enviar dados de forma síncrona para o cabeçalho do pipeline. Use o método DataflowBlock.SendAsync quando precisar enviar dados de forma assíncrona para um nó do fluxo de dados.

Concluir a atividade do pipeline

Adicione o seguinte código para marcar o cabeçalho do pipeline como concluído. O cabeçalho do pipeline propaga sua conclusão após processar todas as mensagens em buffer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Este exemplo envia uma URL pelo pipeline de fluxo de dados que será processado. Se você enviar mais de uma entrada por um pipeline, chame o método IDataflowBlock.Complete após enviar todas as entradas. Você pode omitir essa etapa se seu aplicativo não tiver pontos bem definidos nos quais os dados já não estarão disponíveis ou se o aplicativo não precisar aguardar a conclusão do pipeline.

Aguardar a conclusão do pipeline

Adicione o seguinte código para aguardar a conclusão do pipeline. A operação geral termina com a conclusão da parte final do pipeline.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Você pode aguardar a conclusão do fluxo de dados de qualquer thread ou de vários threads ao mesmo tempo.

O Exemplo Completo

O exemplo a seguir mostra o código completo dessa explicação passo a passo.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Próximas etapas

Este exemplo envia uma URL para ser processada pelo fluxo de dados do pipeline. Se você enviar mais de um valor de entrada pelo pipeline, introduza um formulário de paralelismo no aplicativo, semelhante à forma como as peças seriam movimentadas em uma fábrica de automóveis. Após o primeiro membro do pipeline enviar o resultado para o segundo membro, o primeiro membro poderá processar outro item em paralelo enquanto o segundo membro processa o primeiro resultado.

O paralelismo obtido usando pipelines de fluxo de dados é conhecido como paralelismo de alta granularidade porque, geralmente, é formado por menos tarefas, porém maiores. Você também pode usar um paralelismo mais refinado com tarefas menores e de curta execução em um pipeline de fluxo de dados. Neste exemplo, o membro findReversedWords do pipeline usa o PLINQ para processar em paralelo vários itens da lista de trabalho. O uso do paralelismo refinado em um pipeline de alta granularidade pode melhorar o desempenho geral.

Você também pode conectar um bloco de fluxo de dados de origem a vários blocos de destino para criar uma rede de fluxo de dados. A versão sobrecarregada do método LinkTo usa um objeto Predicate<T> que define se o bloco de destino aceita cada mensagem com base em seu valor. A maioria dos tipos de blocos de fluxo de dados que agem como fontes fornecem mensagens para todos os blocos de destino conectados, na ordem em que estavam conectados, até que um dos blocos aceite essa mensagem. Com esse mecanismo de filtragem, é possível criar sistemas de blocos de fluxo de dados conectados que direcionam determinados dados por um caminho e outros dados por meio de outro caminho. Para ver um exemplo que usa a filtragem para criar uma rede de fluxo de dados, confira Explicação passo a passo: usar o fluxo de dados em um aplicativo do Windows Forms.

Confira também