Flux de données (bibliothèque parallèle de tâches)

La bibliothèque parallèle de tâches (TPL) fournit des composants de flux de données destinés à augmenter la robustesse des applications prenant en charge l’accès concurrentiel. Ces composants de flux de données sont regroupés sous le nom de bibliothèque de flux de données TPL. Ce modèle de flux de données favorise la programmation basée sur les acteurs en fournissant une transmission de messages in-process pour les flux de données à granularité grossière et les tâches de traitement "pipeline". Les composants de flux de données reposent sur les types et l'infrastructure de planification de la bibliothèque parallèle de tâches, et s'intègrent à la prise en charge des langages C#, Visual Basic et F# de la programmation asynchrone. Ces composants de flux de données sont utiles quand vous avez plusieurs opérations qui doivent communiquer entre elles de façon asynchrone ou quand vous voulez traiter les données à mesure qu'elles deviennent disponibles. Prenons par exemple une application qui traite les données d'image d'une webcam. En utilisant le modèle de flux de données, l'application peut traiter les trames d'images à mesure qu'elles deviennent disponibles. Si l’application améliore les trames d’images, par exemple, en effectuant la correction de la lumière ou des yeux rouges, vous pouvez créer un pipeline de composants de flux de données. Chaque étape du pipeline peut utiliser plusieurs fonctionnalités de parallélisme à granularité grossière, telles que les fonctionnalités fournies par la bibliothèque TPL qui permettent de transformer les images.

Ce document fournit une vue d'ensemble de la bibliothèque de flux de données TPL. Il décrit le modèle de programmation et les types de blocs de flux de données prédéfinis, et explique comment configurer des blocs de flux de données pour répondre aux besoins de vos applications.

Notes

La bibliothèque de flux de données TPL (espace de noms System.Threading.Tasks.Dataflow) n'est pas distribuée avec .NET. Pour installer l’espace de noms System.Threading.Tasks.Dataflow dans Visual Studio, ouvrez votre projet, choisissez Gérer les packages NuGet dans le menu Projet, puis recherchez en ligne le package System.Threading.Tasks.Dataflow. Vous pouvez également l’installer à l’aide de l’interface CLI .NET Core en exécutant dotnet add package System.Threading.Tasks.Dataflow.

Modèle de programmation

La bibliothèque de flux de données TPL constitue une base pour la transmission de messages et la parallélisation des applications nécessitant une utilisation importante du processeur et des E/S, et ayant un débit élevé et une faible latence. Elle permet également de contrôler explicitement la manière dont les données sont mises en mémoire tampon et se déplacent sur le système. Pour mieux comprendre le modèle de programmation de flux de données, imaginez une application qui charge des images à partir du disque de manière asynchrone et crée un composite de ces images. Les modèles de programmation traditionnels requièrent généralement l'utilisation de rappels et d'objets de synchronisation, tels que des verrous, pour coordonner les tâches et accéder aux données partagées. À l'aide du modèle de programmation de flux de données, vous pouvez créer des objets de flux de données qui traitent les images à mesure qu'elles sont lues à partir du disque. Sous le modèle de flux de données, vous déclarez la manière dont sont traitées les données quand elles deviennent disponibles, ainsi que les éventuelles dépendances qui existent entre les données. Le runtime gère les dépendances entre les données, ce qui vous évite d'avoir à synchroniser l'accès aux données partagées. De plus, étant donné que le runtime planifie les tâches en fonction de l'arrivée asynchrone des données, le flux de données peut améliorer la réactivité et le débit grâce à une gestion efficace des threads sous-jacents. Vous trouverez un exemple d’utilisation du modèle de programmation de flux de données pour implémenter le traitement d’image dans une application Windows Forms sur la page Procédure pas à pas : utiliser les flux de données dans une application Windows Forms.

Sources et cibles

La bibliothèque de flux de données TPL comprend des blocs de flux de données, structures de données qui mettent les données en mémoire tampon et les traitent. La bibliothèque parallèle de tâches définit trois types de blocs de flux de données : les blocs sources, les blocs cibles et les blocs propagateurs. Un bloc source agit comme une source de données et peut donc être lu. Un bloc cible joue le rôle de récepteur de données, il est donc possible d'y écrire. Un bloc propagateur agit à la fois comme un bloc source et un bloc cible, et peut donc faire l'objet d'une lecture et d'une écriture. La bibliothèque parallèle de tâches définit l'interface System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> pour représenter les sources, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> pour représenter les cibles, et System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> pour représenter les propagateurs. IPropagatorBlock<TInput,TOutput> hérite des deux ISourceBlock<TOutput> et de ITargetBlock<TInput>.

La bibliothèque de flux de données TPL fournit plusieurs types de blocs de flux de données prédéfinis qui implémentent les interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> et IPropagatorBlock<TInput,TOutput>. Ces types de blocs de flux de données sont décrits dans ce document dans la section Types de blocs de flux de données prédéfinis.

Connexion des blocs

Vous pouvez aussi connecter des blocs de flux de données pour former des pipelines, qui sont des séquences linéaires de blocs de flux de données, ou bien des réseaux, qui sont des graphiques de blocs de flux de données. Un pipeline est une forme de réseau. Dans un pipeline ou un réseau, les sources propagent des données de manière asynchrone vers les cibles à mesure que les données deviennent disponibles. La méthode ISourceBlock<TOutput>.LinkTo lie un bloc de flux de données source à un bloc cible. Une source peut être liée à zéro, une ou plusieurs cibles. Une cible peut être liée à zéro, une ou plusieurs sources. Vous pouvez ajouter des blocs de flux de données à un pipeline ou en supprimer de manière simultanée. Les types de blocs de flux de données prédéfinis gèrent tous les aspects liés à la sécurité des threads pour les liaisons et les annulations de liaison.

Vous trouverez un exemple de connexion de blocs de flux de données permettant de former un pipeline de base sur la page Procédure pas à pas : créer un pipeline de flux de données. Vous trouverez un exemple de connexion de blocs de flux de données permettant de former un réseau plus complexe sur la page Procédure pas à pas : utiliser les flux de données dans une application Windows Forms. Vous trouverez un exemple de dissociation d’une cible et d’une source après que la source a envoyé un message à la cible sur la page Guide pratique : dissocier des blocs de flux de données.

Filtrage

Quand vous appelez la méthode ISourceBlock<TOutput>.LinkTo pour lier une source à une cible, vous pouvez fournir un délégué qui détermine si le bloc cible accepte ou rejette les messages en fonction de la valeur de ce message. Ce mécanisme de filtrage permet de garantir qu'un bloc de flux de données recevra uniquement certaines valeurs. Pour la plupart des types de blocs de flux de données prédéfinis, si un bloc source est connecté à plusieurs blocs cibles, quand un bloc cible rejette un message, la source envoie le message à la cible suivante. L'ordre dans lequel une source envoie des messages aux cibles est défini par la source et peut varier en fonction du type de la source. La plupart des types de blocs sources arrêtent d'envoyer un message une fois celui-ci accepté par une cible. La classe BroadcastBlock<T> est une exception. En effet, celle-ci envoie chaque message à l'ensemble des cibles, même si celles-ci le rejettent. Vous trouverez un exemple d’utilisation du filtrage permettant de traiter uniquement certains messages sur la page Procédure pas à pas : utiliser les flux de données dans une application Windows Forms.

Important

Étant donné que chaque type de bloc de flux de données source prédéfini garantit la propagation des messages dans l'ordre où ils sont reçus, tous les messages doivent être lus depuis le bloc source avant que celui-ci ne puisse traiter le message suivant. Par conséquent, quand vous utilisez le filtrage pour connecter plusieurs cibles à une source, assurez-vous que chaque message soit reçu par au moins un bloc cible. Sinon, votre application peut se bloquer.

Transmission de messages

Le modèle de programmation de flux de données est lié au concept de transmission de messages, durant laquelle les composants indépendants d’un programme communiquent entre eux à l’aide de messages. Pour propager des messages entre composants d'application, vous pouvez appeler les méthodes Post (synchrone) et SendAsync (asynchrone) afin d’envoyer des messages aux blocs de flux de données cibles, et les méthodes Receive, ReceiveAsync et TryReceive afin de recevoir des messages provenant de blocs sources. Vous pouvez combiner ces méthodes à des pipelines ou réseaux de flux de données en envoyant des données d'entrée au nœud principal (bloc cible) et en recevant des données de sortie de la part du nœud terminal du pipeline ou des nœuds terminaux du réseau (un ou plusieurs blocs sources). Vous pouvez également utiliser la méthode Choose pour lire la première des sources fournies qui contient des données et effectuer des actions sur ces données.

Les blocs sources envoient des données aux blocs cibles en appelant la méthode ITargetBlock<TInput>.OfferMessage. Le bloc cible peut répondre à un message envoyé de trois manières : il peut accepter le message, le refuser ou le différer. Quand la cible accepte le message, la méthode OfferMessage retourne Accepted. Quand la cible refuse le message, la méthode OfferMessage retourne Declined. Quand la cible décide de ne plus recevoir de messages de la source, OfferMessage renvoie DecliningPermanently. Les types de blocs sources prédéfinis n'envoient plus de messages aux cibles liées après réception d'une telle valeur et sont automatiquement dissociés de ces cibles.

Quand un bloc cible diffère le message pour une utilisation ultérieure, la méthode OfferMessage retourne Postponed. Un bloc cible qui diffère un message peut appeler par la suite la méthode ISourceBlock<TOutput>.ReserveMessage pour tenter de réserver le message envoyé. À ce stade, soit le message est toujours disponible et peut être utilisé par le bloc cible, soit le message a été utilisé par une autre cible. Si, par la suite, le bloc cible demande le message ou n'en a plus besoin, il appellera respectivement la méthode ISourceBlock<TOutput>.ConsumeMessage ou la méthode ReleaseReservation. La réservation de messages est généralement utilisée par les types de blocs de flux de données qui fonctionnent en mode non gourmand. Le mode non gourmand est expliqué plus loin dans ce document. Plutôt que de réserver un message différé, un bloc cible peut également utiliser la méthode ISourceBlock<TOutput>.ConsumeMessage pour tenter d'utiliser directement le message différé.

Achèvement des blocs de flux de données

Les blocs de flux de données prennent également en charge le concept d’achèvement. Un bloc de flux de données qui est achevé n'effectue plus aucune tâche. Chaque bloc de flux de données est associé à un objet System.Threading.Tasks.Task, appelé tâche d'achèvement, qui représente l'état d'achèvement du bloc. Étant donné que vous pouvez attendre qu'un objet Task soit terminé, vous pouvez, à l'aide de tâches d'achèvement, attendre l'achèvement d'un ou plusieurs nœuds terminaux d'un réseau de flux de données. L'interface IDataflowBlock définit la méthode Complete qui informe le bloc de flux de données qu'une requête exige son achèvement, ainsi que la propriété Completion qui retourne la tâche d'achèvement pour le bloc de flux de données. ISourceBlock<TOutput> et ITargetBlock<TInput> héritent de l'interface IDataflowBlock.

Il existe deux façons de déterminer si un bloc de flux de données s'est terminé sans erreur, si une ou plusieurs erreurs se sont produites ou s'il a été annulé. La première consiste à appeler la méthode Task.Wait sur la tâche d'achèvement dans un bloc try-catch (Try-Catch en Visual Basic). Dans l'exemple suivant, un ActionBlock<TInput> objet lève une exception ArgumentOutOfRangeException si sa valeur d'entrée est inférieure à zéro. L’exception AggregateException est levée quand cet exemple appelle Wait sur la tâche d’achèvement. ArgumentOutOfRangeException est accessible via la propriété InnerExceptions de l'objet AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Dans cet exemple, une exception n'est pas gérée dans le délégué d'un bloc de flux de données d'exécution. Nous vous recommandons de gérer les exceptions dans le corps des blocs. Toutefois, si vous ne parvenez pas à le faire, le bloc se comportera comme s'il avait été annulé et ne traitera pas les messages entrants.

Quand un bloc de flux de données est annulé de manière explicite, l'objet AggregateException contient OperationCanceledException dans la propriété InnerExceptions. Pour plus d’informations sur l’annulation de flux de données, consultez la section Permettre les annulations.

La deuxième méthode permettant de déterminer l’état d’achèvement d’un bloc de flux de données est d’utiliser une continuation de la tâche d’achèvement, ou d’utiliser les fonctionnalités de langage asynchrones de C# et de Visual Basic pour attendre la tâche d’achèvement de manière asynchrone. Le délégué que vous fournissez à la méthode Task.ContinueWith prend un objet Task qui représente la tâche précédente. Dans le cas de la propriété Completion, le délégué de la continuation prend la tâche d'achèvement. L’exemple suivant ressemble au précédent, mais il utilise également la méthode ContinueWith pour créer une tâche de continuation qui imprime l’état de l’opération globale de flux de données.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Vous pouvez également utiliser des propriétés telles que IsCanceled dans le corps de la tâche de continuation pour obtenir des informations supplémentaires sur l'état d'achèvement d'un bloc de flux de données. Pour plus d’informations sur les tâches de continuation et leur rôle dans les annulations et la gestion des erreurs, consultez Chaînage des tâches à l’aide de tâches de continuation, Annulation de tâches et Gestion des exceptions.

Types de blocs de flux de données prédéfinis

La bibliothèque de flux de données TPL fournit plusieurs types de blocs de flux de données prédéfinis. Ces types sont répartis en trois catégories : blocs de mise en mémoire tampon, blocs d’exécution et blocs de regroupement. Les sections suivantes décrivent les types de blocs qui composent ces catégories.

Blocs de mise en mémoire tampon

Les blocs de mise en mémoire tampon contiennent des données destinées aux consommateurs de données. La bibliothèque de flux de données TPL fournit trois types de blocs de mise en mémoire tampon : System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> et System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

La classe BufferBlock<T> représente une structure de messagerie asynchrone à usage général. Cette classe stocke une file d'attente de messages de type premier entré, premier sorti (FIFO). Plusieurs cibles peuvent lire ces messages et plusieurs sources peuvent y écrire. Quand une cible reçoit un message d'un objet BufferBlock<T>, ce message est supprimé de la file d'attente. Par conséquent, même si un objet BufferBlock<T> peut avoir plusieurs cibles, seule une cible reçoit chaque message. La classe BufferBlock<T> est utile quand vous voulez transmettre plusieurs messages à un autre composant et que ce composant doit recevoir chaque message.

Dans l'exemple simple qui suit, plusieurs valeurs Int32 sont publiées sur un objet BufferBlock<T>, puis sont lues depuis l'objet.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Vous trouverez un exemple complet montrant comment écrire et lire des messages dans un objet BufferBlock<T> sur la page Guide pratique : écrire et lire des messages dans un bloc de flux de données.

BroadcastBlock<T>

La classe BroadcastBlock<T> est utile quand vous devez transmettre plusieurs messages à un autre composant, mais que ce composant nécessite uniquement la valeur la plus récente. Cette classe est également utile quand vous voulez diffuser un message vers plusieurs composants.

Dans l'exemple simple qui suit, une valeur Double est publiée sur un objet BroadcastBlock<T>, puis est lue plusieurs fois depuis l'objet. Étant donné que les valeurs ne sont pas supprimées des objets BroadcastBlock<T> après leur lecture, la même valeur est disponible à chaque fois.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Vous trouverez un exemple complet montrant comment utiliser BroadcastBlock<T> pour diffuser un message vers plusieurs blocs cibles sur la page Guide pratique : spécifier un Planificateur de tâches dans un bloc de flux de données.

WriteOnceBlock<T>

La classe WriteOnceBlock<T> est similaire à la classe BroadcastBlock<T>, à ceci près qu'il n'est possible d'écrire qu'une seule fois dans un objet WriteOnceBlock<T>. Vous pouvez assimiler WriteOnceBlock<T> au mot clé readonly en C# (ReadOnly en Visual Basic). Toutefois, la différence se trouve dans le fait qu'un objet WriteOnceBlock<T> devient immuable après avoir reçu une valeur, et non à la construction. Comme pour la classe BroadcastBlock<T>, quand une cible reçoit un message d'un objet WriteOnceBlock<T>, le message n'est pas supprimé de l'objet. Par conséquent, plusieurs cibles reçoivent une copie du message. La classe WriteOnceBlock<T> est utile quand vous voulez propager uniquement le premier d'une liste de messages.

Dans l'exemple simple qui suit, plusieurs valeurs String sont publiées sur un objet WriteOnceBlock<T>, puis sont lues depuis l'objet. Étant donné qu'il n'est possible d'écrire dans un objet WriteOnceBlock<T> qu'une seule fois, une fois qu'un objet WriteOnceBlock<T> reçoit un message, il rejette les messages suivants.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Vous trouverez un exemple complet montrant comment utiliser WriteOnceBlock<T> pour recevoir la valeur de la première opération terminée sur la page Guide pratique : dissocier des blocs de flux de données.

Blocs d'exécution

Les blocs d'exécution appellent un délégué fourni par l'utilisateur pour chaque donnée reçue. La bibliothèque de flux de données TPL fournit trois types de blocs d'exécution : ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> et System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

La classe ActionBlock<TInput> est un bloc cible qui appelle un délégué quand il reçoit des données. Imaginez un objet ActionBlock<TInput> utilisé comme un délégué qui s'exécute de façon asynchrone quand des données deviennent disponibles. Le délégué que vous fournissez à un objet ActionBlock<TInput> peut être de type Action<T> ou System.Func<TInput, Task>. Quand vous utilisez un objet ActionBlock<TInput> avec Action<T>, le traitement de chaque élément d'entrée est considéré comme terminé quand le délégué est retourné. Quand vous utilisez un objet ActionBlock<TInput> avec System.Func<TInput, Task>, le traitement de chaque élément d'entrée n'est considéré comme terminé que quand l'objet Task retourné est à l'état achevé. À l'aide de ces deux mécanismes, vous pouvez utiliser ActionBlock<TInput> pour le traitement synchrone et asynchrone de chaque élément d'entrée.

Dans l'exemple simple qui suit, plusieurs valeurs Int32 sont publiées sur un objet ActionBlock<TInput>. L'objet ActionBlock<TInput> imprime ces valeurs dans la console. Cet exemple attribue ensuite au bloc l'état achevé, puis attend que toutes les tâches de flux de données soient terminées.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Vous trouverez des exemples complets montrant comment utiliser des délégués avec la classe ActionBlock<TInput> sur la page Guide pratique : exécuter des actions lorsqu'un bloc de flux de données reçoit des données.

TransformBlock<TInput, TOutput>

La classe TransformBlock<TInput,TOutput> est similaire à la classe ActionBlock<TInput>, à ceci près qu'elle joue à la fois le rôle de source et le rôle de cible. Le délégué que vous passez à un objet TransformBlock<TInput,TOutput> renvoie une valeur de type TOutput. Le délégué que vous fournissez à un objet TransformBlock<TInput,TOutput> peut être de type System.Func<TInput, TOutput> ou System.Func<TInput, Task<TOutput>>. Quand vous utilisez un objet TransformBlock<TInput,TOutput> avec System.Func<TInput, TOutput>, le traitement de chaque élément d'entrée est considéré comme terminé quand le délégué est retourné. Quand vous utilisez un objet TransformBlock<TInput,TOutput> utilisé avec System.Func<TInput, Task<TOutput>>, le traitement de chaque élément d'entrée n'est considéré comme terminé que quand l'objet Task<TResult> retourné est à l'état achevé. Comme pour ActionBlock<TInput>, ces deux mécanismes vous permettent d'utiliser TransformBlock<TInput,TOutput> pour le traitement synchrone et asynchrone de chaque élément d'entrée.

Dans l'exemple simple qui suit, l'objet TransformBlock<TInput,TOutput> créé calcule la racine carrée de son entrée. L'objet TransformBlock<TInput,TOutput> prend des valeurs Int32 comme entrée et produit des valeurs Double comme sortie.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Vous trouverez des exemples complets utilisant TransformBlock<TInput,TOutput> dans un réseau de blocs de flux de données qui réalise un traitement d’image dans une application Windows Forms sur la page Procédure pas à pas : utiliser un flux de données dans une application Windows Forms.

TransformManyBlock<TInput, TOutput>

La classe TransformManyBlock<TInput,TOutput> est similaire à la classe TransformBlock<TInput,TOutput>, à ceci près que TransformManyBlock<TInput,TOutput> produit zéro, une ou plusieurs valeurs de sortie pour chaque valeur d'entrée, au lieu d'une seule valeur de sortie pour chaque valeur d'entrée. Le délégué que vous fournissez à un objet TransformManyBlock<TInput,TOutput> peut être de type System.Func<TInput, IEnumerable<TOutput>> ou System.Func<TInput, Task<IEnumerable<TOutput>>>. Quand vous utilisez un objet TransformManyBlock<TInput,TOutput> avec System.Func<TInput, IEnumerable<TOutput>>, le traitement de chaque élément d'entrée est considéré comme terminé quand le délégué est retourné. Quand vous utilisez un objet TransformManyBlock<TInput,TOutput> avec System.Func<TInput, Task<IEnumerable<TOutput>>>, le traitement de chaque élément d'entrée n'est considéré comme terminé que quand l'objet System.Threading.Tasks.Task<IEnumerable<TOutput>> retourné est à l'état achevé.

Dans l'exemple simple qui suit, l'objet TransformManyBlock<TInput,TOutput> créé fractionne les chaînes en séquences de caractères. L'objet TransformManyBlock<TInput,TOutput> prend des valeurs String comme entrée et produit des valeurs Char comme sortie.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Vous trouverez des exemples complets utilisant TransformManyBlock<TInput,TOutput> pour produire plusieurs sorties indépendantes pour chaque entrée dans un pipeline de flux de données sur la page Procédure pas à pas : créer un pipeline de flux de données.

Degré de parallélisme

Chaque objet ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput> met en mémoire tampon les messages entrants jusqu'à ce que le bloc soit prêt à les traiter. Par défaut, ces classes de traitent les messages un par un, dans l'ordre dans lequel ils sont reçus. Vous pouvez également spécifier le degré de parallélisme permettant aux objets ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput> de traiter plusieurs messages simultanément. Pour plus d'informations sur l'exécution simultanée, consultez la section "Spécification du degré de parallélisme" plus loin dans ce document. Vous trouverez un exemple dans lequel un degré de parallélisme est défini pour permettre à un bloc de flux de données d’exécution de traiter plusieurs messages à la fois sur la page Guide pratique : spécifier le degré de parallélisme dans un bloc de flux de données.

Récapitulation des types délégués

Le tableau suivant récapitule les types délégués que vous pouvez fournir aux objets ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput>. Le tableau indique également si le type délégué fonctionne de façon synchrone ou asynchrone.

Type Type délégué synchrone Type délégué asynchrone
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Vous pouvez également utiliser des expressions lambda quand vous utilisez des types de blocs d'exécution. Vous trouverez un exemple qui montre comment utiliser une expression lambda avec un bloc d’exécution sur la page Guide pratique : exécuter des actions lorsqu’un bloc de flux de données reçoit des données.

Blocs de regroupement

Les blocs de regroupement combinent les données d'une ou plusieurs sources, sous diverses contraintes. La bibliothèque de flux de données TPL fournit trois types de blocs de regroupement : BatchBlock<T>, JoinBlock<T1,T2> et BatchedJoinBlock<T1,T2>.

BatchBlock<T>

La classe BatchBlock<T> combine des jeux de données d'entrée (ou lots), dans des tableaux de données de sortie. Vous spécifiez la taille de chaque lot quand vous créez un objet BatchBlock<T>. Quand l'objet BatchBlock<T> reçoit le nombre spécifié d'éléments d'entrée, il propage de manière asynchrone un tableau contenant ces éléments. Si un objet BatchBlock<T> est à l'état achevé, mais ne contient pas suffisamment d'éléments pour former un lot, il propage un tableau final contenant les éléments d'entrée restants.

La classe BatchBlock<T> peut fonctionner en mode gourmand ou non gourmand. En mode gourmand, qui est le mode par défaut, un objet BatchBlock<T> accepte tous les messages qui lui sont envoyés et propage un tableau après avoir reçu le nombre spécifié d'éléments. En mode non gourmand, un objet BatchBlock<T> diffère tous les messages entrants jusqu'à ce que suffisamment de sources aient envoyé au bloc un nombre de messages permettant de former un lot. Le mode gourmand est généralement plus performant que le mode non gourmand, car il nécessite une charge de traitement moindre. Toutefois, vous pouvez utiliser le mode non gourmand quand vous devez coordonner la consommation de plusieurs sources de façon atomique. Pour spécifier le mode non gourmand, définissez Greedy sur False dans le paramètre dataflowBlockOptions du constructeur BatchBlock<T>.

Dans l'exemple simple qui suit, plusieurs valeurs Int32 sont publiées sur un objet BatchBlock<T> qui contient un lot de dix éléments. Pour garantir que toutes les valeurs de BatchBlock<T> soient propagées, cet exemple appelle la méthode Complete. La méthode Complete définit l'objet BatchBlock<T> sur l'état achevé. L'objet BatchBlock<T> propage donc les éléments restants dans un dernier lot.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Vous trouverez un exemple complet utilisant BatchBlock<T> pour améliorer l'efficacité des opérations d'insertion en base de données sur la page Procédure pas à pas : utiliser BatchBlock et BatchedJoinBlock pour améliorer l'efficacité.

JoinBlock<T1, T2, ...>

Les classes JoinBlock<T1,T2> et JoinBlock<T1,T2,T3> collectent des éléments d'entrée et propagent les objets System.Tuple<T1,T2> ou System.Tuple<T1,T2,T3> qui contiennent ces éléments. Les classes JoinBlock<T1,T2> et JoinBlock<T1,T2,T3> n'héritent pas de ITargetBlock<TInput>. Au lieu de cela, elles fournissent les propriétés Target1, Target2 et Target3 qui implémentent ITargetBlock<TInput>.

Comme BatchBlock<T>, JoinBlock<T1,T2> et JoinBlock<T1,T2,T3> peuvent fonctionner en mode gourmand ou non gourmand. En mode gourmand, qui est le mode par défaut, un objet JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> accepte tous les messages qui lui sont envoyés et propage un tuple chaque fois que l'une de ses cibles reçoit au moins un message. En mode non gourmand, un objet JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> diffère tous les messages entrants jusqu'à ce que toutes les cibles aient reçu les données requises pour créer un tuple. À ce stade, le bloc s'engage dans un protocole de validation en deux phases pour récupérer atomiquement tous les éléments requis à partir des sources. Ce report permet à une autre entité de consommer les données pendant ce temps, pour que l'ensemble du système puisse progresser.

Dans l'exemple simple qui suit, un objet JoinBlock<T1,T2,T3> nécessite plusieurs données pour calculer une valeur. Dans cet exemple, l'objet JoinBlock<T1,T2,T3> créé nécessite deux valeurs Int32 et une valeur Char pour effectuer une opération arithmétique.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Vous trouverez un exemple complet utilisant des objets JoinBlock<T1,T2> en mode non gourmand pour partager une ressource de manière coopérative sur la page Guide pratique : utiliser JoinBlock pour lire des données issues de plusieurs sources.

BatchedJoinBlock<T1, T2, ...>

Les classes BatchedJoinBlock<T1,T2> et BatchedJoinBlock<T1,T2,T3> collectent des lots d'éléments d'entrée et propagent les objets System.Tuple(IList(T1), IList(T2)) ou System.Tuple(IList(T1), IList(T2), IList(T3)) qui contiennent ces éléments. BatchedJoinBlock<T1,T2> est un mélange entre BatchBlock<T> et JoinBlock<T1,T2>. Vous spécifiez la taille de chaque lot quand vous créez un objet BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> fournit également les propriétés Target1 et Target2 qui implémentent ITargetBlock<TInput>. Quand le nombre spécifié d'éléments d'entrée est reçu par l'ensemble des cibles, l'objet BatchedJoinBlock<T1,T2> propage de manière asynchrone un objet System.Tuple(IList(T1), IList(T2)) qui contient ces éléments.

Dans l'exemple simple qui suit, l'objet BatchedJoinBlock<T1,T2> créé contient des résultats, des valeurs Int32 et des erreurs qui sont des objets Exception. Dans cet exemple, plusieurs opérations sont effectuées. Les résultats sont écrits dans la propriété Target1 et les erreurs dans la propriété Target2 de l'objet BatchedJoinBlock<T1,T2>. Étant donné que le nombre d'opérations ayant réussi et ayant échoué n'est pas connu à l'avance, les objets IList<T> permettent à chaque cible de recevoir zéro, une ou plusieurs valeurs.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Vous trouverez un exemple complet utilisant BatchedJoinBlock<T1,T2> pour capturer les résultats et toutes les exceptions qui se produisent quand le programme lit les données d'une base de données sur la page Procédure pas à pas : utiliser BatchBlock et BatchedJoinBlock pour améliorer l'efficacité.

Configuration du comportement des blocs de flux de données

Vous pouvez activer des options supplémentaires en fournissant un objet System.Threading.Tasks.Dataflow.DataflowBlockOptions au constructeur de types de blocs de flux de données. Ces options permettent de contrôler le comportement, comme celui du planificateur qui gère la tâche sous-jacente et le degré de parallélisme. DataflowBlockOptions possède également des types dérivés qui spécifient le comportement spécifique à certains types de blocs de flux de données. Le tableau suivant récapitule les types d'options qui sont associés à chaque type de bloc de flux de données.

Type de bloc de flux de données TypeDataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Les sections suivantes fournissent des informations supplémentaires sur les types importants d'options de blocs de flux de données qui sont disponibles via les classes System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions et System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Spécification du planificateur de tâches

Chaque bloc de flux de données prédéfini utilise le mécanisme de planification des tâches TPL pour effectuer des activités, telles que la propagation de données vers une cible, la réception de données à partir d'une source et l'exécution de délégués définis par l'utilisateur quand des données deviennent disponibles. TaskScheduler est une classe abstraite qui représente un planificateur de tâches qui place des tâches en attente dans des threads. Le planificateur de tâches par défaut, Default, utilise la classe ThreadPool pour placer des tâches en file d’attente et les exécuter. Vous pouvez remplacer le planificateur de tâches par défaut en définissant la propriété TaskScheduler quand vous créez un objet de bloc de flux de données.

Quand un même planificateur de tâches gère plusieurs blocs de flux de données, il peut appliquer les mêmes stratégies à chacune d'elles. Par exemple, si plusieurs blocs de flux de données sont configurés de manière à cibler le planificateur exclusif du même objet ConcurrentExclusiveSchedulerPair, toutes les tâches qui sont exécutées dans ces blocs seront sérialisées. De même, si ces blocs sont configurés de manière à cibler le planificateur simultané du même objet ConcurrentExclusiveSchedulerPair et que le planificateur est configuré avec un niveau d'accès concurrentiel maximal, toutes les tâches de ces blocs seront limitées au nombre d'opérations simultanées. Vous trouverez un exemple utilisant la classe ConcurrentExclusiveSchedulerPair pour permettre que des opérations de lecture s’effectuent en parallèle, tout en imposant que chaque opération d’écriture soit réalisée de manière exclusive sur la page Guide pratique : spécifier un Planificateur de tâches dans un bloc de flux de données. Pour plus d’informations sur les planificateurs de tâches dans la bibliothèque parallèle de tâches, consultez la rubrique sur la classe TaskScheduler.

Spécification du degré de parallélisme

Par défaut, les trois types de blocs d'exécution fournis par la bibliothèque de flux de données TPL (ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput>) traitent les messages un par un. Ces types de blocs de flux de données traitent également les messages dans l'ordre dans lequel ils sont reçus. Pour permettre aux blocs de flux de données de traiter simultanément les messages, définissez la propriété ExecutionDataflowBlockOptions.MaxDegreeOfParallelism au moment de créer l'objet de bloc de flux de données.

La valeur par défaut de MaxDegreeOfParallelism est 1, ce qui signifie que le bloc de flux de données traitera les messages un par un. En définissant cette propriété sur une valeur supérieure à 1, vous permettez au bloc de flux de données de traiter plusieurs messages simultanément. Si vous définissez cette propriété sur DataflowBlockOptions.Unbounded, vous permettez au planificateur de tâches sous-jacentes de gérer le degré maximal de concurrence.

Important

Quand vous spécifiez un degré maximal de parallélisme supérieur à 1, plusieurs messages sont traités simultanément. Il se peut donc que les messages ne soient pas traités dans l’ordre dans lequel ils sont reçus. Toutefois, les messages sont renvoyés du bloc dans le même ordre qu’ils ont été reçus.

Étant donné que la propriété MaxDegreeOfParallelism représente le degré maximal de parallélisme, le bloc de flux de données peut s'exécuter avec un degré de parallélisme moindre spécifié par vos soins. Le bloc de flux de données peut utiliser un degré de parallélisme moindre pour satisfaire ses exigences fonctionnelles ou en cas de manque de ressources système disponibles. Un bloc de flux de données n'utilisera jamais un niveau de parallélisme supérieur à celui que vous spécifiez.

La valeur de la propriété MaxDegreeOfParallelism est exclusive à chaque objet de bloc de flux de données. Par exemple, si chacun des quatre objets de blocs de flux de données spécifie la valeur 1 comme degré maximal de parallélisme, les quatre objets de bloc de flux de données peuvent être exécutés en parallèle.

Vous trouverez un exemple dans lequel est défini le degré maximal de parallélisme permettant à des opérations de longue durée d’être exécutées en parallèle sur la page Guide pratique : spécifier le degré de parallélisme dans un bloc de flux de données.

Spécification du nombre de messages par tâche

Les types de blocs de flux de données prédéfinis utilisent des tâches pour traiter plusieurs éléments d'entrée. Cela aide à réduire le nombre d'objets de tâches requis pour traiter les données, ce qui permet aux applications de s'exécuter plus efficacement. Toutefois, quand les tâches d'un ensemble de blocs de flux de données traitent des données, il est possible que les tâches des autres blocs de flux de données doivent attendre d'être traitées et placer leurs messages dans la file d'attente. Pour un meilleur respect de l'ordre des tâches de flux de données, définissez la propriété MaxMessagesPerTask. Quand MaxMessagesPerTask a la valeur DataflowBlockOptions.Unbounded, qui est la valeur par défaut, la tâche utilisée par un bloc de flux de données traite tous les messages disponibles. Quand MaxMessagesPerTask est défini sur une valeur autre que Unbounded, le bloc de flux de données traite au maximum le nombre défini de messages par objet Task. Même si la configuration de la propriété MaxMessagesPerTask peut améliorer le respect de l'ordre des tâches, elle peut aussi entraîner la création par le système de davantage de tâches que nécessaire, réduisant ainsi les performances.

Permettre les annulations

La bibliothèque parallèle de tâches (TPL) fournit un mécanisme qui permet aux tâches de coordonner l'annulation de manière coopérative. Pour permettre aux blocs de flux de données de participer à ce mécanisme d'annulation, définissez la propriété CancellationToken. Quand l'objet CancellationToken est à l'état annulé, tous les blocs de flux de données qui contrôlent ce jeton terminent l'exécution de l'élément actuel, mais ne démarrent pas le traitement des éléments suivants. De plus, ces blocs de flux de données effacent les messages mis en mémoire tampon, libèrent les connexions aux blocs sources et cibles, et passent à l'état annulé. Lors du passage à l'état annulé, la propriété Status de la propriété Completion est définie sur Canceled, sauf si une exception s'est produite lors du traitement. Dans ce cas, Status est défini sur Faulted.

Vous trouverez un exemple qui montre comment utiliser l’annulation dans une application Windows Forms sur la page Guide pratique : annuler un bloc de flux de données. Pour plus d’informations sur les annulations dans la bibliothèque parallèle de tâches, consultez la page Annulation de tâches.

Spécification des comportements gourmand et non gourmand

Plusieurs types de blocs de flux de données de regroupement peuvent fonctionner en mode gourmand ou en mode non gourmand. Par défaut, les types de blocs de flux de données prédéfinis fonctionnent en mode gourmand.

Pour les types de blocs de regroupement tels que JoinBlock<T1,T2>, le mode gourmand signifie que le bloc accepte immédiatement les données, même si les données correspondantes avec lesquelles effectuer le regroupement ne sont pas encore disponibles. Le mode non gourmand signifie que le bloc diffère tous les messages entrants jusqu'à ce que chacune de ses cibles ait reçu un message, permettant ainsi le regroupement. Si l'un des messages différés n'est plus disponible, le bloc de regroupement libère tous les messages différés et redémarre le processus. Pour la classe BatchBlock<T>, les comportements gourmand et non gourmand sont similaires, à ceci près qu'en mode non gourmand, un objet BatchBlock<T> diffère tous les messages entrants jusqu'à ce que suffisamment de messages soient disponibles dans plusieurs sources distinctes pour former un lot.

Pour spécifier le mode non gourmand pour un bloc de flux de données, définissez Greedy sur False. Vous trouverez un exemple qui montre comment utiliser le mode non gourmand pour permettre à plusieurs blocs de regroupement de partager une source de données plus efficacement sur la page Guide pratique : utiliser JoinBlock pour lire des données issues de plusieurs sources.

Blocs de flux de données personnalisés

Même si la bibliothèque de flux de données TPL fournit de nombreux types de blocs prédéfinis, vous pouvez créer d'autres types de blocs ayant un comportement personnalisé. Implémentez directement les interfaces ISourceBlock<TOutput> ou ITargetBlock<TInput>, ou utilisez la méthode Encapsulate pour créer un bloc complexe qui encapsule le comportement des types de blocs existants. Vous trouverez des exemples qui montrent comment implémenter la fonctionnalité de bloc de flux de données personnalisé sur la page Procédure pas à pas : créer un type de bloc de flux de données personnalisé.

Intitulé Description
Procédure : Écrire et lire des messages dans un bloc de flux de données Montre comment écrire des messages dans un objet BufferBlock<T> et les lire.
Procédure : implémenter un modèle de flux de données producteur-consommateur Explique comment utiliser le modèle de flux de données pour implémenter un modèle producteur-consommateur, où le producteur envoie des messages à un bloc de flux de données et le consommateur lit les messages de ce bloc.
Procédure : Exécuter des actions quand un bloc de flux de données reçoit des données Explique comment fournir des délégués aux types de blocs de flux de données d'exécution ActionBlock<TInput>, TransformBlock<TInput,TOutput> et TransformManyBlock<TInput,TOutput>.
Procédure pas à pas : création d’un pipeline de dataflow Explique comment créer un pipeline de flux de données qui télécharge du texte à partir du web et effectue des opérations sur ce texte.
Procédure : Dissocier des blocs de flux de données Montre comment utiliser la méthode LinkTo pour dissocier un bloc cible de sa source après que celle-ci lui a envoyé un message.
Procédure pas à pas : utilisation d’un dataflow dans une application Windows Forms Montre comment créer un réseau de blocs de flux de données qui effectuent un traitement des images dans une application Windows Forms.
Procédure : annuler un bloc de dataflow Montre comment utiliser l'annulation dans une application Windows Forms.
Procédure : Utiliser JoinBlock pour lire des données de plusieurs sources Explique comment utiliser la classe JoinBlock<T1,T2> pour effectuer une opération quand des données sont disponibles dans plusieurs sources, et comment utiliser le mode non gourmand pour permettre à plusieurs blocs de regroupement de partager une source de données plus efficacement.
Procédure : Spécifier le degré de parallélisme dans un bloc de flux de données Explique comment définir la propriété MaxDegreeOfParallelism pour permettre à un bloc de flux de données d'exécution de traiter plusieurs messages à la fois.
Procédure : spécifier un planificateur de tâches dans un bloc de dataflow Montre comment associer un planificateur de tâches spécifique quand vous utilisez un flux de données dans votre application.
Procédure pas à pas : Utiliser BatchBlock et BatchedJoinBlock pour améliorer l’efficacité Explique comment utiliser la classe BatchBlock<T> pour améliorer l'efficacité des opérations d'insertion de bases de données, et comment utiliser la classe BatchedJoinBlock<T1,T2> pour capturer les résultats et les exceptions qui se produisent quand le programme lit les données d'une base de données.
Procédure pas à pas : Créer un type de bloc de flux de données personnalisé Montre deux façons de créer un type de bloc de flux de données qui implémente un comportement personnalisé.
Bibliothèque parallèle de tâches Présente la bibliothèque parallèle de tâches (TPL), qui simplifie la programmation parallèle et simultanée dans les applications .NET Framework.