Sviluppo di un componente di trasformazione personalizzato con output asincroni

Utilizzare un componente con output asincroni quando una trasformazione non può inviare righe all'output prima che il componente abbia ricevuto tutte le righe di input oppure quando la trasformazione non produce esattamente una riga di output per ogni riga ricevuta come input. La trasformazione Aggregazione, ad esempio, non può calcolare una somma tra righe prima di aver letto tutte le righe. Viceversa, è possibile utilizzare un componente con output sincroni quando si modifica ogni riga di dati passata. È possibile modificare i dati per ogni riga sul posto oppure creare una o più nuove colonne aggiuntive, ognuna delle quali con un valore per ogni riga di input. Per ulteriori informazioni sulla differenza tra componenti sincroni e asincroni, vedere Informazioni sulle trasformazioni sincrone e asincrone.

I componenti di trasformazione con output asincroni sono univoci, perché agiscono contemporaneamente da componenti di destinazione e di origine. Questo tipo di componente riceve righe dai componenti a monte e aggiunge righe che vengono utilizzate dai componenti a valle. Nessun altro componente del flusso di dati esegue entrambe queste operazioni.

Le colonne dei componenti a monte che sono disponibili per un componente con output sincroni sono automaticamente disponibili per i componenti a valle del componente. Pertanto, un componente con output sincroni non deve necessariamente definire colonne di output per fornire colonne e righe al componente successivo. I componenti con output asincroni, al contrario, devono definire colonne di output e fornire righe ai componenti a valle. Pertanto, un componente con output asincroni deve eseguire più attività durante la fase di progettazione e di esecuzione, mentre lo sviluppatore deve implementare più codice.

In SQL ServerIntegration Services sono incluse diverse trasformazioni con output asincroni. La trasformazione Ordinamento, ad esempio, richiede che tutte le righe siano presenti al fine di poter essere ordinate e ottiene tale risultato tramite output asincroni. Dopo aver ricevuto tutte le righe, la trasformazione le ordina e le aggiunge all'output.

In questa sezione viene illustrato in dettaglio come sviluppare trasformazioni con output asincroni. Per un esempio completo di componente di trasformazione con output asincroni, vedere gli esempi di Integration Services in Codeplex. Per ulteriori informazioni sullo sviluppo di componenti di origine, vedere Sviluppo di un componente di origine personalizzato.

Fase di progettazione

Creazione del componente

La proprietà SynchronousInputID sull'oggetto IDTSOutput100 identifica se un output è sincrono o asincrono. Per creare un output asincrono, aggiungere l'output al componente e impostare SynchronousInputID su zero. L'impostazione di questa proprietà determina anche se l'attività Flusso di dati alloca oggetti PipelineBuffer sia per l'input che per l'output del componente o se viene allocato un singolo buffer condiviso tra i due oggetti.

Nel codice di esempio seguente è illustrato un componente che crea un output asincrono nell'implementazione di ProvideComponentProperties.

using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;


namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]
    public class AsyncComponent : PipelineComponent
    {
        public override void ProvideComponentProperties()
        {
            // Call the base class, which adds a synchronous input
            // and output.
            base.ProvideComponentProperties();

            // Make the output asynchronous.
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
            output.SynchronousInputID = 0;
        }
    }
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _
Public Class AsyncComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Call the base class, which adds a synchronous input
        ' and output.
        Me.ProvideComponentProperties()

        ' Make the output asynchronous.
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
        output.SynchronousInputID = 0

    End Sub


End Class

Creazione e configurazione di colonne di output

Come accennato in precedenza, un componente asincrono aggiunge colonne alla propria raccolta di colonne di output per fornire colonne ai componenti a valle. Sono disponibili diversi metodi della fase di progettazione tra cui scegliere, a seconda dei requisiti del componente. Se ad esempio si desidera passare tutte le colonne dai componenti a monte ai componenti a valle, è necessario eseguire l'override del metodo OnInputPathAttached per aggiungere le colonne, perché si tratta del primo metodo in cui le colonne di input sono disponibili per il componente.

Se il componente crea colonne di output in base alle colonne selezionate per il proprio input, eseguire l'override del metodo SetUsageType per selezionare le colonne di output e indicare come verranno utilizzate.

Se un componente con output asincroni crea colonne di output in base alle colonne dei componenti a monte e le colonne disponibili a valle cambiano, il componente deve aggiornare la propria raccolta di colonne di output. Queste modifiche devono essere rilevate dal componente durante Validate e corrette durante ReinitializeMetaData.

[!NOTA]

Quando una colonna di output viene rimossa dalla raccolta di colonne di output, i componenti a valle del flusso di dati che fanno riferimento a tale colonna possono essere influenzati negativamente. È necessario correggere la colonna di output senza rimuoverla e ricrearla per evitare l'interruzione dei componenti a valle. Se ad esempio il tipo di dati della colonna è stato modificato, è necessario aggiornarlo.

Nell'esempio di codice seguente è illustrato un componente che aggiunge una colonna di output alla propria raccolta di colonne di output per ogni colonna disponibile del componente a monte.

public override void OnInputPathAttached(int inputID)
{
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
   IDTSVirtualInput100 vInput = input.GetVirtualInput();

   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)
   {
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();
      outCol.Name = vCol.Name;
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);
   }
}
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()

    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection

        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()
        outCol.Name = vCol.Name
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)

    Next
End Sub

Fase di esecuzione

I componenti con output asincroni eseguono anche una sequenza di metodi della fase di esecuzione diversa rispetto agli altri tipi di componenti. Innanzitutto, si tratta degli unici componenti che ricevono una chiamata ai metodi PrimeOutput e ProcessInput. I componenti con output asincroni richiedono inoltre l'accesso a tutte le righe in ingresso prima che possano iniziare l'elaborazione. Pertanto, devono memorizzare le righe di input nella cache interna finché non sono state lette tutte le righe. Infine, a differenza di altri componenti, i componenti con output asincroni ricevono sia un buffer di input che un buffer di output.

Informazioni sui buffer

Il componente riceve il buffer di input durante ProcessInput. Questo buffer contiene le righe aggiunte dai componenti a monte. Contiene inoltre le colonne dell'input del componente, oltre alle colonne fornite nell'output di un componente a monte ma che non sono state aggiunte alla raccolta di input del componente asincrono.

Il buffer di output, fornito al componente in PrimeOutput, inizialmente non contiene righe. Il componente aggiunge righe a questo buffer e lo fornisce ai componenti a valle quando è pieno. Il buffer di output contiene le colonne definite nella raccolta di colonne di output del componente, oltre alle eventuali colonne aggiunte da altri componenti a valle ai propri output.

Questo comportamento è diverso da quello dei componenti con output sincroni, che ricevono un singolo buffer condiviso. Il buffer condiviso di un componente con output sincroni contiene sia le colonne di input che di output del componente, oltre alle colonne aggiunte agli output dei componenti a monte e a valle.

Elaborazione di righe

Memorizzazione di righe di input nella cache

Quando si scrive un componente con output asincroni, è possibile scegliere tra tre diversi modi per aggiungere righe al buffer di output. È possibile aggiungerle non appena vengono ricevute righe di input, memorizzarle nella cache finché il componente non ha ricevuto tutte le righe dal componente a monte oppure aggiungerle quando è appropriato per il componente. Il metodo scelto dipende dai requisiti del componente. Ad esempio, il componente Ordinamento richiede che tutte le righe a monte vengano ricevute prima che sia possibile ordinarle. Pertanto, attende che vengano lette tutte le righe prima di aggiungere righe al buffer di output.

Il componente deve memorizzare nella cache interna le righe ricevute nel buffer di input finché non è pronto a elaborarle. Le righe del buffer in ingresso possono essere memorizzate nella cache in una tabella di dati, una matrice multidimensionale o qualsiasi altra struttura interna. Per un esempio di componente che memorizza nella cache interna le righe del buffer in ingresso finché non sono state lette tutte le righe, vedere Remove Duplicates Component Sample.

Aggiunta di righe di output

Sia che le righe vengano aggiunte al buffer di output non appena vengono ricevute o dopo la ricezione di tutte le righe, questa operazione viene eseguita chiamando il metodo AddRow sul buffer di output. Dopo aver aggiunto la nuova riga, impostare i valori di ogni colonna al suo interno.

Poiché a volte il buffer di output contiene più colonne rispetto alla raccolta di colonne di output del componente, è necessario individuare l'indice della colonna appropriata nel buffer prima di impostarne il valore. Il metodo FindColumnByLineageID della proprietà BufferManager restituisce l'indice della colonna nella riga del buffer con l'ID di derivazione specificato, che viene quindi utilizzato per assegnare il valore alla colonna di buffer.

Il metodo PreExecute, chiamato prima del metodo PrimeOutput o del metodo ProcessInput, è il primo metodo in cui è disponibile la proprietà BufferManager e rappresenta la prima opportunità per individuare gli indici delle colonne dei buffer di input e di output.

Esempio

Nell'esempio seguente è illustrato un componente di trasformazione semplice con output asincroni che aggiunge righe al buffer di output non appena vengono ricevute. Nell'esempio non vengono dimostrati tutti i metodi e le funzionalità descritti nell'argomento. Vengono descritti i metodi importanti di cui ogni componente di trasformazione personalizzato con output asincroni deve eseguire l'override, ma non è incluso codice per la convalida in fase di progettazione. Inoltre, nel codice di ProcessInput si presuppone che la raccolta di colonne di output includa un'unica colonna per ogni colonna presente nella raccolta di colonne di input. Per un esempio completo di componente di trasformazione con output asincroni, vedere Remove Duplicates Component Sample.

using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace Microsoft.Samples.SqlServer.Dts
{
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]
   public class AsynchronousOutput : PipelineComponent
   {
      PipelineBuffer outputBuffer;
      int[] inputColumnBufferIndexes;
      int[] outputColumnBufferIndexes;

      public override void ProvideComponentProperties()
      {
         // Let the base class add the input and output objects.
         base.ProvideComponentProperties();

         // Name the input and output, and make the
         // output asynchronous.
         ComponentMetaData.InputCollection[0].Name = "Input";
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;
      }
      public override void PreExecute()
      {
         IDTSInput100 input = ComponentMetaData.InputCollection[0];
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];

         for (int col = 0; col < input.InputColumnCollection.Count; col++)
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);

         for (int col = 0; col < output.OutputColumnCollection.Count; col++)
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);

      }

      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
      {
         if (buffers.Length != 0)
            outputBuffer = buffers[0];
      }
      public override void ProcessInput(int inputID, PipelineBuffer buffer)
      {
            // Advance the buffer to the next row.
            while (buffer.NextRow())
            {
               // Add a row to the output buffer.
               outputBuffer.AddRow();
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)
               {
                  // Copy the data from the input buffer column to the output buffer column.
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];
               }
            }
         if (buffer.EndOfRowset)
         {
            // EndOfRowset on the input buffer is true.
            // Set EndOfRowset on the output buffer.
            outputBuffer.SetEndOfRowset();
         }
      }
   }
}
Imports System
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

Namespace Microsoft.Samples.SqlServer.Dts

    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _
    Public Class AsynchronousOutput


        Inherits PipelineComponent

        Private outputBuffer As PipelineBuffer
        Private inputColumnBufferIndexes As Integer()
        Private outputColumnBufferIndexes As Integer()

        Public Overrides Sub ProvideComponentProperties()

            ' Let the base class add the input and output objects.
            Me.ProvideComponentProperties()

            ' Name the input and output, and make the
            ' output asynchronous.
            ComponentMetaData.InputCollection(0).Name = "Input"
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0
        End Sub

        Public Overrides Sub PreExecute()

            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)

            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)

            For col As Integer = 0 To input.InputColumnCollection.Count
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)
            Next

            For col As Integer = 0 To output.OutputColumnCollection.Count
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)
            Next

        End Sub
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())

            If buffers.Length <> 0 Then
                outputBuffer = buffers(0)
            End If

        End Sub

        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)

                ' Advance the buffer to the next row.
                While (buffer.NextRow())

                    ' Add a row to the output buffer.
                    outputBuffer.AddRow()
                    For x As Integer = 0 To inputColumnBufferIndexes.Length

                        ' Copy the data from the input buffer column to the output buffer column.
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))

                    Next
                End While

            If buffer.EndOfRowset = True Then
                ' EndOfRowset on the input buffer is true.
                ' Set the end of row set on the output buffer.
                outputBuffer.SetEndOfRowset()
            End If
        End Sub
    End Class
End Namespace
Icona di Integration Services (piccola) Rimanere aggiornati con Integration Services

Per informazioni sui download, gli articoli, gli esempi e i video Microsoft più recenti, nonché sulle soluzioni selezionate dalla community, visitare la pagina Integration Services su MSDN o Technet:

Per ricevere notifiche automatiche su questi aggiornamenti, sottoscrivere i feed RSS disponibili nella pagina.