同期出力型のカスタム変換コンポーネントの開発

同期出力型の変換コンポーネントは、上流コンポーネントから行を受け取り、これらの行の列の値を読み取ったり変更したりして、下流コンポーネントに渡します。 このコンポーネントは、上流コンポーネントから提供される列から派生する、別の出力列も定義しますが、データ フローに行を追加することはありません。 同期コンポーネントと非同期コンポーネントの相違点の詳細については、「同期および非同期変換について」を参照してください。

この種類のコンポーネントは、データがコンポーネントに提供されたときにインラインで変更され、処理する前にコンポーネントがすべての行を確認する必要がないタスクに適しています。 通常、同期出力型の変換では、外部データ ソースへの接続、外部メタデータ列の管理、および出力バッファーへの行の追加は行われないため、このコンポーネントを開発するのは最も簡単です。

同期出力型の変換コンポーネントを作成するには、コンポーネント用に選択された上流列を格納する IDTSInput100 オブジェクトと、コンポーネントによって作成された派生列を格納する IDTSOutput100 オブジェクトを追加します。 さらに、デザイン時のメソッドを実装し、実行中に受信バッファー行内の列を読み取ったり変更するコードを設定します。

このセクションでは、カスタム変換コンポーネントの実装に必要な情報、および概念の理解に役立つコード例を提供します。

デザイン時

このコンポーネントのデザイン時のコードでは、入力と出力の作成、コンポーネントが生成するすべての出力の追加、および、コンポーネントの構成の検証を行います。

コンポーネントの作成

コンポーネントの入力、出力、およびカスタム プロパティは、通常、ProvideComponentProperties メソッドの実行中に作成します。 同期出力型の変換コンポーネントの入力および出力を追加するには、2 つの方法があります。 メソッドの基本クラスの実装を使用して、作成された既定の入力および出力を変更する方法と、入力と出力を手動で明示的に追加する方法です。

次のコード例は、入力および出力オブジェクトを明示的に追加する ProvideComponentProperties を実装します。 同様の処理を行うために呼び出す基本クラスについては、コメントに記載されています。

using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;

namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "SynchronousComponent", ComponentType = ComponentType.Transform)]
    public class SyncComponent : PipelineComponent
    {

        public override void ProvideComponentProperties()
        {
            // Add the input.
            IDTSInput100 input = ComponentMetaData.InputCollection.New();
            input.Name = "Input";

            // Add the output.
            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
            output.Name = "Output";
            output.SynchronousInputID = input.ID;

            // Alternatively, you can let the base class add the input and output
            // and set the SynchronousInputID of the output to the ID of the input.
            // base.ProvideComponentProperties();
        }
    }
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

<DtsPipelineComponent(DisplayName:="SynchronousComponent", ComponentType:=ComponentType.Transform)> _
Public Class SyncComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Add the input.
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
        input.Name = "Input"

        ' Add the output.
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
        output.Name = "Output"
        output.SynchronousInputID = Input.ID

        ' Alternatively, you can let the base class add the input and output
        ' and set the SynchronousInputID of the output to the ID of the input.
        ' base.ProvideComponentProperties();

    End Sub

End Class

出力列の作成と設定

同期出力型の変換コンポーネントでは、バッファーに行は追加されません。ただし、コンポーネントの出力に出力列を追加することはできます。 通常、コンポーネントが出力列を追加すると、新しい出力列の値は、実行時に上流コンポーネントから渡される 1 つ以上の列に含まれるデータから取得されます。

出力列が作成されたら、データ型のプロパティを設定する必要があります。 出力列のデータ型のプロパティを設定するには特別な処理が必要で、その処理は、SetDataTypeProperties メソッドを呼び出して実行します。 このメソッドが必要なのは、DataTypeLengthPrecision、および CodePage の各プロパティが読み取り専用で、各プロパティの設定が他の設定に依存しているためです。 このメソッドを使用すると、プロパティの値を確実に矛盾なく設定でき、データ フロー タスクにより、正しく設定されていることが検証されます。

列の DataType により、他のプロパティに設定される値が決定されます。 次の表は、各 DataType の依存するプロパティの要件を示しています。 ここに示されていないデータ型の場合、依存するプロパティは 0 に設定されます。

データ型

長さ

小数点以下桁数

有効桁数

コード ページ

DT_DECIMAL

0

0 より大きく 28 以下

0

0

DT_CY

0

0

0

0

DT_NUMERIC

0

0 より大きく 28 以下で、有効桁数の値未満

1 以上 38 以下

0

DT_BYTES

0 より大きい

0

0

0

DT_STR

0 より大きく 8000 未満

0

0

0 以外の有効なコード ページ

DT_WSTR

0 より大きく 4000 未満

0

0

0

データ型プロパティの制約は出力列のデータ型に基づくため、マネージ型を処理する場合、Integration Services の正しいデータ型を選択する必要があります。 基本クラスには、ConvertBufferDataTypeToFitManagedBufferTypeToDataRecordType、および DataRecordTypeToBufferType の 3 つのヘルパー メソッドがあり、これを使用すると、マネージ コンポーネントの開発者は、マネージ型に対応する SSIS のデータ型を適切に選択できます。 これらのメソッドは、マネージ データ型と SSIS のデータ型を相互に変換します。

実行時

一般的に、コンポーネントのランタイム部分の実装は、コンポーネントの入力および出力列をバッファー内で検索するタスクと、受信バッファー行内にあるこれらの列の値を読み取りまたは書き込みするタスクの 2 つに分類されます。

バッファー内の列の検索

実行中にコンポーネントに提供されるバッファー内の列数は、コンポーネントの入力または出力コレクション内の列数よりも大きい場合があります。 各バッファーには、データ フローのコンポーネントで定義されているすべての出力列が含まれているためです。 バッファーの列が入力または出力の列と正しく一致していることを確認するには、BufferManagerFindColumnByLineageID メソッドを使用する必要があります。 このメソッドは、特定のバッファー内の列を系列 ID によって検索します。 通常、列は PreExecute の実行中に検索されます。これは、BufferManager プロパティが使用できるようになる最初の実行時メソッドであるためです。

次のコード例は、PreExecute の実行中に、入力および出力列のコレクションで列のインデックスを検索するコンポーネントを示します。 列インデックスは整数の配列に格納され、ProcessInput の実行中にコンポーネントからアクセスできます。

int []inputColumns;
int []outputColumns;

public override void PreExecute()
{
    IDTSInput100 input = ComponentMetaData.InputCollection[0];
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    inputColumns = new int[input.InputColumnCollection.Count];
    outputColumns = new int[output.OutputColumnCollection.Count];

    for(int col=0; col < input.InputColumnCollection.Count; col++)
    {
        IDTSInputColumn100 inputColumn = input.InputColumnCollection[col];
        inputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID);
    }

    for(int col=0; col < output.OutputColumnCollection.Count; col++)
    {
        IDTSOutputColumn100 outputColumn = output.OutputColumnCollection[col];
        outputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID);
    }

}
Public Overrides Sub PreExecute()

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)


    ReDim inputColumns(input.InputColumnCollection.Count)
    ReDim outputColumns(output.OutputColumnCollection.Count)

    For col As Integer = 0 To input.InputColumnCollection.Count

        Dim inputColumn As IDTSInputColumn100 = input.InputColumnCollection(col)
        inputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID)
    Next

    For col As Integer = 0 To output.OutputColumnCollection.Count

        Dim outputColumn As IDTSOutputColumn100 = output.OutputColumnCollection(col)
        outputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID)
    Next

End Sub

行の処理

コンポーネントは、ProcessInput メソッドで、行および列を含む PipelineBuffer オブジェクトを受け取ります。 このメソッドではバッファー内の行が繰り返され、PreExecute が読み取られて変更されている間に列が識別されます。 このメソッドは、上流コンポーネントから行が渡されなくなるまで、データ フロー タスクによって繰り返し呼び出されます。

バッファー内の列は、配列のインデクサーにアクセスする方法を使用するか、Get または Set メソッドのいずれかを使用することにより、個別に読み取りまたは書き込みが行われます。 バッファー内の列のデータ型がわかっている場合には、Get および Set メソッドの方が効率的であり、こちらを使用するようお勧めします。

次のコード例では、受信行を処理する ProcessInput メソッドを実装します。

public override void ProcessInput( int InputID, PipelineBuffer buffer)
{
       while( buffer.NextRow())
       {
            for(int x=0; x < inputColumns.Length;x++)
            {
                if(!buffer.IsNull(inputColumns[x]))
                {
                    object columnData = buffer[inputColumns[x]];
                    // TODO: Modify the column data.
                    buffer[inputColumns[x]] = columnData;
                }
            }
        
      }
}
Public Overrides Sub ProcessInput(ByVal InputID As Integer, ByVal buffer As PipelineBuffer)

        While (buffer.NextRow())

            For x As Integer = 0 To inputColumns.Length

                if buffer.IsNull(inputColumns(x)) = false then

                    Dim columnData As Object = buffer(inputColumns(x))
                    ' TODO: Modify the column data.
                    buffer(inputColumns(x)) = columnData

                End If
            Next

        End While
End Sub

サンプル

次のサンプルは、すべての文字列型の列の値を大文字に変換する、簡単な同期出力型の変換コンポーネントを示しています。 ただし、この例ではここで説明したメソッドや機能のすべてが使われているわけではありません。 同期出力型の変換コンポーネントで必ずオーバーライドしなければならない重要なメソッドは示していますが、デザイン時の検証のためのコードは含まれていません。

using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace Uppercase
{
  [DtsPipelineComponent(DisplayName = "Uppercase")]
  public class Uppercase : PipelineComponent
  {
    ArrayList m_ColumnIndexList = new ArrayList();

    public override void ProvideComponentProperties()
    {
      base.ProvideComponentProperties();
      ComponentMetaData.InputCollection[0].Name = "Uppercase Input";
      ComponentMetaData.OutputCollection[0].Name = "Uppercase Output";
    }

    public override void PreExecute()
    {
      IDTSInput100 input = ComponentMetaData.InputCollection[0];
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;

      foreach (IDTSInputColumn100 column in inputColumns)
      {
        if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
        {
          m_ColumnIndexList.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
        }
      }
    }

    public override void ProcessInput(int inputID, PipelineBuffer buffer)
    {
      while (buffer.NextRow())
      {
        foreach (int columnIndex in m_ColumnIndexList)
        {
          string str = buffer.GetString(columnIndex);
          buffer.SetString(columnIndex, str.ToUpper());
        }
      }
    }
  }
}
Imports System 
Imports System.Collections 
Imports Microsoft.SqlServer.Dts.Pipeline 
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper 
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper 
Namespace Uppercase 

 <DtsPipelineComponent(DisplayName="Uppercase")> _ 
 Public Class Uppercase 
 Inherits PipelineComponent 
   Private m_ColumnIndexList As ArrayList = New ArrayList 

   Public  Overrides Sub ProvideComponentProperties() 
     MyBase.ProvideComponentProperties 
     ComponentMetaData.InputCollection(0).Name = "Uppercase Input" 
     ComponentMetaData.OutputCollection(0).Name = "Uppercase Output" 
   End Sub 

   Public  Overrides Sub PreExecute() 
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0) 
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection 
     For Each column As IDTSInputColumn100 In inputColumns 
       If column.DataType = DataType.DT_STR OrElse column.DataType = DataType.DT_WSTR Then 
         m_ColumnIndexList.Add(CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)) 
       End If 
     Next 
   End Sub 

   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer) 
     While buffer.NextRow 
       For Each columnIndex As Integer In m_ColumnIndexList 
         Dim str As String = buffer.GetString(columnIndex) 
         buffer.SetString(columnIndex, str.ToUpper) 
       Next 
     End While 
   End Sub 
 End Class 
End Namespace
Integration Services のアイコン (小) 最新の Integration Services の入手

マイクロソフトが提供する最新のダウンロード、アーティクル、サンプル、ビデオ、およびコミュニティで選択されたソリューションについては、MSDN の Integration Services のページを参照してください。


これらの更新が自動で通知されるようにするには、ページの RSS フィードを定期受信します。

関連項目

概念

非同期出力型のカスタム変換コンポーネントの開発

同期および非同期変換について

スクリプト コンポーネントによる同期変換の作成