Méthodes d'exécution d'un composant de flux de données
Au moment de l'exécution, la tâche de flux de données examine la séquence de composants, prépare un plan d'exécution et gère un pool de threads de travail qui exécute le plan de travail. La tâche charge des lignes de données à partir des sources, les traite via des transformations, puis les enregistre dans des destinations.
Séquence d'exécution des méthodes
Pendant l'exécution d'un composant de flux de données, un sous-ensemble des méthodes de la classe de base PipelineComponent est appelé. Les méthodes, et l'ordre dans lequel elles sont appelées, sont toujours les mêmes, à l'exception des méthodes PrimeOutput et ProcessInput. Ces deux méthodes sont appelées en fonction de l'existence et de la configuration des objets IDTSInput100 et IDTSOutput100 d'un composant.
La liste suivante présente les méthodes dans l'ordre dans lequel elles sont appelées pendant l'exécution d'un composant. Notez que lorsqu'elle est appelée, la méthode PrimeOutput l'est toujours avant la méthode ProcessInput.
Méthode PrimeOutput
La méthode PrimeOutput est appelée lorsqu'un composant possède au moins une sortie, attachée à un composant en aval via un objet IDTSPath100 et lorsque la propriété SynchronousInputID de la sortie est nulle. La méthode PrimeOutput est appelée pour les composants source et les transformations à sorties asynchrones. Contrairement à la méthode ProcessInput décrite ci-dessous, la méthode PrimeOutput est appelée une seule fois pour chaque composant qui la requiert.
Méthode ProcessInput
La méthode ProcessInput est appelée pour les composants qui possèdent au moins une entrée attachée à un composant en amont par un objet IDTSPath100. La méthode ProcessInput est appelée pour les composants de destination et les transformations à sorties synchrones. l'ProcessInput est appelée à plusieurs reprises jusqu'à ce qu'il n'y ait plus aucune ligne à traiter à partir des composants en amont.
Utilisation des entrées et des sorties
Au moment de l'exécution, les composants de flux de données effectuent les tâches suivantes :
Les composants source ajoutent des lignes.
Les composants de transformation à sorties synchrones reçoivent les lignes fournies par les composants source.
Les composants de transformation à sorties asynchrones reçoivent des lignes et en ajoutent.
Les composants de destination reçoivent des lignes, puis les chargent dans une destination.
Pendant l'exécution, la tâche de flux de données alloue des objets PipelineBuffer qui contiennent toutes les colonnes définies dans les collections de colonnes de sortie d'une séquence de composants. Par exemple, si chacun des quatre composants d'une séquence de flux de données ajoute une colonne de sortie à sa collection de colonnes de sortie, le tampon fourni à chaque composant contient quatre colonnes, une pour chaque colonne de sortie par composant. En raison de ce comportement, un composant reçoit parfois des tampons qui contiennent des colonnes qu'il n'utilise pas.
Étant donné que les tampons reçus par votre composant peuvent contenir des colonnes que le composant n'utilisera pas, vous devez localiser les colonnes que vous souhaitez utiliser dans les collections de colonnes d'entrée et de sortie de votre composant dans le tampon fourni au composant par la tâche de flux de données. Pour ce faire, vous utilisez la méthode FindColumnByLineageID de la propriété BufferManager. Pour des raisons liées aux performances, cette tâche s'effectue en général pendant la méthode PreExecute, plutôt que pendant la méthode PrimeOutput ou ProcessInput.
La méthode PreExecute est appelée avant les méthodes PrimeOutput et ProcessInput et correspond à la première possibilité pour un composant d'effectuer ce travail une fois que la propriété BufferManager est devenue disponible pour le composant. Pendant cette méthode, le composant doit localiser ses colonnes dans les tampons et stocker en interne ces informations afin que les colonnes puissent être utilisées dans les méthodes PrimeOutput ou ProcessInput.
L'exemple de code suivant montre comment un composant de transformation à sortie synchrone localise ses colonnes d'entrée dans le tampon pendant la méthode PreExecute.
private int []bufferColumnIndex;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
bufferColumnIndex = new int[input.InputColumnCollection.Count];
for( int x=0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);
}
}
Dim bufferColumnIndex As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim bufferColumnIndex(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)
bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)
Next
End Sub
Ajout de lignes
Les composants fournissent des lignes aux composants en aval en ajoutant des lignes aux objets PipelineBuffer. La tâche de flux de données fournit un tableau de tampons de sortie, un pour chaque objet IDTSOutput100 connecté à un composant en aval, en tant que paramètre de la méthode PrimeOutput. Les composants source et de transformation à sorties asynchrones ajoutent des lignes aux tampons et appellent la méthode SetEndOfRowset lorsqu'ils ont terminé d'ajouter des lignes. La tâche de flux de données gère les tampons de sortie qu'elle fournit aux composants et, lorsque le tampon sature, déplace automatiquement les lignes incluses dans le tampon vers le composant suivant. La méthode PrimeOutput est appelée une fois par composant, contrairement à la méthode ProcessInput, appelée à plusieurs reprises.
L'exemple de code suivant montre comment un composant ajoute des lignes à ses tampons de sortie pendant la méthode PrimeOutput, puis appelle la méthode SetEndOfRowset.
public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)
{
for( int x=0; x < outputs; x++ )
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);
PipelineBuffer buffer = buffers[x];
// TODO: Add rows to the output buffer.
}
foreach( PipelineBuffer buffer in buffers )
{
/// Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset();
}
}
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)
For x As Integer = 0 To outputs.MaxValue
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))
Dim buffer As PipelineBuffer = buffers(x)
' TODO: Add rows to the output buffer.
Next
For Each buffer As PipelineBuffer In buffers
' Notify the data flow task that no more rows are coming.
buffer.SetEndOfRowset()
Next
End Sub
Pour plus d'informations sur le développement des composants qui ajoutent des lignes aux tampons de sortie, consultez Développement d'un composant source personnalisé et Développement d'un composant de transformation personnalisé à sorties asynchrones.
Réception de lignes
Les composants reçoivent des lignes provenant des composants en amont dans les objets PipelineBuffer. La tâche de flux de données fournit un objet PipelineBuffer qui contient les lignes ajoutées au flux de données par les composants en amont en tant que paramètre de la méthode ProcessInput. Ce tampon d'entrée peut être utilisé pour examiner et modifier les lignes et colonnes dans le tampon, mais il ne permet pas d'ajouter ou de supprimer des lignes. La méthode ProcessInput est appelée à plusieurs reprises jusqu'à ce qu'il n'y ait plus de tampon disponible. Lors du dernier appel, la propriété EndOfRowset a pour valeur true. Vous pouvez parcourir la collection de lignes dans le tampon en utilisant la méthode NextRow, qui permet d'accéder à la ligne suivante du tampon. Cette méthode retourne false lorsque le tampon est sur la dernière ligne de la collection. Vous n'avez pas à vérifier la propriété EndOfRowset sauf si vous devez effectuer une action supplémentaire une fois que les dernières lignes de données ont été traitées.
Le texte suivant illustre le modèle correct pour l'utilisation de la méthode NextRow et de la propriété EndOfRowset :
while (buffer.NextRow())
{
// Do something with each row.
}
if (buffer.EndOfRowset)
{
// Optionally, do something after all rows have been processed.
}
L'exemple de code suivant montre comment un composant traite les lignes dans les tampons d'entrée pendant la méthode ProcessInput.
public override void ProcessInput( int inputID, PipelineBuffer buffer )
{
{
IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
while( buffer.NextRow())
{
// TODO: Examine the columns in the current row.
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
While buffer.NextRow() = True
' TODO: Examine the columns in the current row.
End While
End Sub
Pour plus d'informations sur le développement des composants qui reçoivent des lignes dans les tampons d'entrée, consultez Développement d'un composant de destination personnalisé et Développement d'un composant de transformation personnalisé à sorties synchrones.
|