カスタム変換先コンポーネントの開発

更新 : 2006 年 4 月 14 日

Microsoft SQL Server 2005 Integration Services (SSIS) を使用すると、開発者は、任意のカスタム データ ソースに接続してデータを格納するためのカスタム変換先コンポーネントを記述することができます。カスタム変換先コンポーネントは、Integration Services に含まれている、既存の変換先コンポーネントを使用してもアクセスできないデータ ソースに接続する必要がある場合に役立ちます。

変換先コンポーネントには 1 つ以上の入力がありますが、出力はありません。デザイン時に、外部データ ソースとの接続を作成して設定を行い、そこから列メタデータを読み込みます。実行時には、外部データ ソースに接続し、データ フロー内で上流にあるコンポーネントから受け取った行を、この外部データ ソースに追加します。コンポーネントの実行前に外部データ ソースが存在する場合は、変換先コンポーネントにより、コンポーネントが受け取る列のデータ型が、外部データ ソースの列のデータ型と一致することを確認する必要もあります。

このセクションでは、変換先コンポーネントの開発方法の詳細について説明し、重要な概念をわかりやすくするため、コード例を示します。変換先コンポーネントのサンプルについては、「DatasetDestination コンポーネント サンプル」を参照してください。データ フロー コンポーネントの開発全般の概要については、「カスタム データ フロー コンポーネントの開発」を参照してください。

デザイン時

変換先コンポーネントのデザイン時の機能を実装する作業には、外部データ ソースへの接続の指定、およびコンポーネントが正しく設定されているかどうかの検証が含まれます。定義上、変換先コンポーネントには 1 つの入力と、場合によって 1 つのエラー出力があります。

コンポーネントの作成

変換先コンポーネントは、パッケージで定義された ConnectionManager オブジェクトを使用して、外部データ ソースに接続します。変換先コンポーネントは、接続マネージャに関する要求を SSIS デザイナやコンポーネントのユーザーに示すため、ComponentMetaDataRuntimeConnectionCollection コレクションに要素を追加します。このコレクションには 2 つの目的があります。1 つは、接続マネージャの必要性を SSIS デザイナに通知することで、もう 1 つは、ユーザーが接続マネージャを選択または作成した後に、コンポーネントが使用する、パッケージ内の接続マネージャへの参照を保持することです。IDTSRuntimeConnection90 がコレクションに追加されると、[詳細エディタ][接続プロパティ] タブを表示します。このタブでは、コンポーネントが使用する接続をパッケージ内で選択したり、新しく作成したりすることができます。

次のコード例は、RuntimeConnectionCollection に入力を追加し、次に IDTSRuntimeConnection90 オブジェクトを追加する ProvideComponentProperties の実装を示します。

using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;

namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
    public class DestinationComponent : PipelineComponent 
    {
        public override void ProvideComponentProperties()
        {
            // Reset the component.
            base.RemoveAllInputsOutputsAndCustomProperties();
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();

            IDTSInput90 input = ComponentMetaData.InputCollection.New();
            input.Name = "Input";

            IDTSRuntimeConnection90 connection = ComponentMetaData.RuntimeConnectionCollection.New();
            connection.Name = "ADO.net";
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

Namespace Microsoft.Samples.SqlServer.Dts

    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
    Public Class DestinationComponent
        Inherits PipelineComponent

        Public Overrides Sub ProvideComponentProperties()

            ' Reset the component.
            Me.RemoveAllInputsOutputsAndCustomProperties()
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()

            Dim input As IDTSInput90 = ComponentMetaData.InputCollection.New()
            input.Name = "Input"

            Dim connection As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New()
            connection.Name = "ADO.net"

        End Sub
    End Class
End Namespace

外部データ ソースへの接続

RuntimeConnectionCollection に接続を追加した後、AcquireConnections メソッドをオーバーライドして、外部データ ソースへの接続を確立します。このメソッドは、デザイン時と実行時に呼び出されます。コンポーネントは、実行時の接続によって指定された接続マネージャへの接続を確立し、続いて外部データ ソースへの接続を確立する必要があります。接続が確立されると、コンポーネントは接続を内部にキャッシュし、ReleaseConnections が呼び出されると解放します。開発者はこのメソッドをオーバーライドし、コンポーネントが確立した接続を AcquireConnections の実行中に解放します。これらの ReleaseConnections メソッドおよび AcquireConnections メソッドは、どちらもデザイン時と実行時に呼び出されます。

次のコード例では、AcquireConnections メソッドで ADO.NET 接続へ接続し、次に ReleaseConnections で接続を閉じるコンポーネントを示します。

using Microsoft.SqlServer.Dts.Runtime.Wrapper;

private SqlConnection sqlConnection;

public override void AcquireConnections(object transaction)
{
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
    {
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.ToConnectionManager(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;

        if (cmado == null)
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");

        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
        sqlConnection.Open();
    }
}

public override void ReleaseConnections()
{
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
        sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

Private sqlConnection As SqlConnection

Public Overrides Sub AcquireConnections(ByVal transaction As Object)

    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then

        Dim cm As ConnectionManager = DtsConvert.ToConnectionManager(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)

        If IsNothing(cmado) Then
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
        End If

        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
        sqlConnection.Open()

    End If
End Sub

Public Overrides Sub ReleaseConnections()

    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
        sqlConnection.Close()
    End If

End Sub

コンポーネントの検証

コンポーネントの検証で説明しているように、変換先コンポーネントの開発者は検証を実行する必要があります。また、コンポーネントの入力列コレクションで定義されている列のデータ型プロパティが、外部データ ソースの列と一致することを検証する必要もあります。ただし、コンポーネントまたは SSIS デザイナが接続されていない状態の場合や、サーバーへのラウンド トリップが許可されない場合など、外部データ ソースに対する入力列の検証が不可能または望ましくないこともあります。このような状況でも、入力オブジェクトの ExternalMetadataColumnCollection を使用して、入力列コレクションの列を検証することができます。

このコレクションは入力オブジェクトおよび出力オブジェクトの両方に存在し、コンポーネント開発者が外部データ ソースの列から設定します。SSIS デザイナがオフラインである場合、コンポーネントが接続されていない場合、または ValidateExternalMetadata プロパティが false である場合は、このコレクションを使用して入力列を検証できます。

次のコード例は、既存の入力列に基づく外部メタデータ列を追加します。

private void AddExternalMetaDataColumn(IDTSInput90 input,IDTSInputColumn90 inputColumn)
{
    // Set the properties of the external metadata column.
    IDTSExternalMetadataColumn90 externalColumn = input.ExternalMetadataColumnCollection.New();
    externalColumn.Name = inputColumn.Name;
    externalColumn.Precision = inputColumn.Precision;
    externalColumn.Length = inputColumn.Length;
    externalColumn.DataType = inputColumn.DataType;
    externalColumn.Scale = inputColumn.Scale;

    // Map the external column to the input column.
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput90, ByVal inputColumn As IDTSInputColumn90)

    ' Set the properties of the external metadata column.
    Dim externalColumn As IDTSExternalMetadataColumn90 = input.ExternalMetadataColumnCollection.New()
    externalColumn.Name = inputColumn.Name
    externalColumn.Precision = inputColumn.Precision
    externalColumn.Length = inputColumn.Length
    externalColumn.DataType = inputColumn.DataType
    externalColumn.Scale = inputColumn.Scale

    ' Map the external column to the input column.
    inputColumn.ExternalMetadataColumnID = externalColumn.ID

End Sub

実行時

実行中は、データでいっぱいになった PipelineBuffer を上流コンポーネントから渡されるたびに、変換先コンポーネントは ProcessInput メソッドに対する呼び出しを受け取ります。このメソッドは、渡されるバッファがなくなり、EndOfRowset プロパティが true になるまで、繰り返して呼び出されます。このメソッド内で、変換先コンポーネントはバッファ内の行と列を読み取り、外部データ ソースに追加します。

バッファ内の列の検索

コンポーネントの入力バッファには、グラフ内でそのコンポーネントの上流に位置するコンポーネントの出力列コレクションで定義された、すべての列が格納されています。たとえば、変換元コンポーネントの出力に 3 つの列があり、その次のコンポーネントで出力列が 1 つ追加された場合、変換先コンポーネントが書き出す列が 2 つのみの場合でも、変換先コンポーネントに用意されたバッファには 4 つの列が格納されます。

入力バッファ内の列の順序は、変換先コンポーネントの入力列コレクション内の列のインデックスでは定義されません。バッファ行で列を確実に検索するには、BufferManagerFindColumnByLineageID メソッドを使用するしか方法がありません。このメソッドは、指定されたバッファ内で指定された系列 ID の列を検索し、行内の位置を返します。入力列のインデックスは、通常、PreExecute メソッド内で検索され、後で ProcessInput で使用するために開発者によってキャッシュされます。

次のコード例は、PreExecute でバッファ内の入力列の位置を検索し、配列に保存します。続いてこの配列を ProcessInput の処理中に使用し、バッファ内の列の値を読み取ります。

int[] cols;

public override void PreExecute()
{
    IDTSInput90 input = ComponentMetaData.InputCollection[0];

    cols = new int[input.InputColumnCollection.Count];

    for (int x = 0; x < input.InputColumnCollection.Count; x++)
    {
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
    }
}
Private cols As Integer()

Public Overrides Sub PreExecute()

    Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0)

    ReDim cols(input.InputColumnCollection.Count)

    For x As Integer = 0 To input.InputColumnCollection.Count

        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
    Next x

End Sub

行の処理

バッファ内の入力列を検索したら、列の値を読み取って外部データ ソースに書き出すことができます。

変換先コンポーネントが行を外部データ ソースに書き込んでいる間に、IncrementPipelinePerfCounter メソッドを呼び出して "Rows read" または "BLOB bytes read" パフォーマンス カウンタを更新することができます。詳細については、「データ フロー エンジンのパフォーマンスの監視」を参照してください。

次の例は、ProcessInput で提供されるバッファから行を読み取るコンポーネントを示します。バッファ内の列のインデックスは、先のコード例にある PreExecute 内で検索したものを使用しています。

public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
    if(!buffer.EndOfRowset)
    {
        while (buffer.NextRow())
        {
            foreach (int col in cols)
            {
                if (!buffer.IsNull(col))
                {
                    //  TODO: Read the column data.
                }
            }
        }
    }
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)

    If buffer.EndOfRowset = False Then

        While (buffer.NextRow())

            For Each col As Integer In cols

                If buffer.IsNull(col) = False Then

                    '  TODO: Read the column data.
                End If

            Next col
        End While
    End If
End Sub

サンプル

次の例は、ファイル接続マネージャを使用して、データ フローからのバイナリ データをファイルに保存する、簡単な変換先コンポーネントを示します。ただし、この例ではここで説明したメソッドや機能のすべてが使われているわけではありません。変換先コンポーネントが必ずオーバーライドしなければならない重要なメソッドは示していますが、デザイン時の検証のためのコードは含まれていません。変換先コンポーネントの完全なサンプルについては、「DatasetDestination コンポーネント サンプル」を参照してください。

using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;

namespace BlobDst
{
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
  public class BlobDst : PipelineComponent
  {
    string m_DestDir;
    int m_FileNameColumnIndex = -1;
    int m_BlobColumnIndex = -1;

    public override void ProvideComponentProperties()
    {
      IDTSInput90 input = ComponentMetaData.InputCollection.New();
      input.Name = "BLOB Extractor Destination Input";
      input.HasSideEffects = true;

      IDTSRuntimeConnection90 conn = ComponentMetaData.RuntimeConnectionCollection.New();
      conn.Name = "FileConnection";
    }

    public override void AcquireConnections(object transaction)
    {
      IDTSRuntimeConnection90 conn = ComponentMetaData.RuntimeConnectionCollection[0];
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);

      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
        m_DestDir += "\\";
    }

    public override IDTSInputColumn90 SetUsageType(int inputID, IDTSVirtualInput90 virtualInput, int lineageID, DTSUsageType usageType)
    {
      IDTSInputColumn90 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
      IDTSCustomProperty90 custProp;

      custProp = inputColumn.CustomPropertyCollection.New();
      custProp.Name = "IsFileName";
      custProp.Value = (object)false;

      custProp = inputColumn.CustomPropertyCollection.New();
      custProp.Name = "IsBLOB";
      custProp.Value = (object)false;

      return inputColumn;
    }

    public override void PreExecute()
    {
      IDTSInput90 input = ComponentMetaData.InputCollection[0];
      IDTSInputColumnCollection90 inputColumns = input.InputColumnCollection;
      IDTSCustomProperty90 custProp;

      foreach (IDTSInputColumn90 column in inputColumns)
      {
        custProp = column.CustomPropertyCollection["IsFileName"];
        if ((bool)custProp.Value == true)
        {
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
        }

        custProp = column.CustomPropertyCollection["IsBLOB"];
        if ((bool)custProp.Value == true)
        {
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
        }
      }
    }

    public override void ProcessInput(int inputID, PipelineBuffer buffer)
    {
      while (buffer.NextRow())
      {
        string strFileName = buffer.GetString(m_FileNameColumnIndex);
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);

        strFileName = TranslateFileName(strFileName);

        // Make sure directory exists before creating file.
        FileInfo fi = new FileInfo(strFileName);
        if (!fi.Directory.Exists)
          fi.Directory.Create();

        // Write the data to the file.
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
        fs.Write(blobData, 0, blobLength);
        fs.Close();
      }
    }

    private string TranslateFileName(string fileName)
    {
      if (fileName.Length > 2 && fileName[1] == ':')
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);
      else
        return m_DestDir + fileName;
    }
  }
}
Imports System 
Imports System.IO 
Imports Microsoft.SqlServer.Dts.Pipeline 
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper 
Namespace BlobDst 

 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _ 
 Public Class BlobDst 
 Inherits PipelineComponent 
   Private m_DestDir As String 
   Private m_FileNameColumnIndex As Integer = -1 
   Private m_BlobColumnIndex As Integer = -1 

   Public  Overrides Sub ProvideComponentProperties() 
     Dim input As IDTSInput90 = ComponentMetaData.InputCollection.New 
     input.Name = "BLOB Extractor Destination Input" 
     input.HasSideEffects = True 
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New 
     conn.Name = "FileConnection" 
   End Sub 

   Public  Overrides Sub AcquireConnections(ByVal transaction As Object) 
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection(0) 
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String) 
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then 
       m_DestDir += "\" 
     End If 
   End Sub 

   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput90, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn90 
     Dim inputColumn As IDTSInputColumn90 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType) 
     Dim custProp As IDTSCustomProperty90 
     custProp = inputColumn.CustomPropertyCollection.New 
     custProp.Name = "IsFileName" 
     custProp.Value = CType(False, Object) 
     custProp = inputColumn.CustomPropertyCollection.New 
     custProp.Name = "IsBLOB" 
     custProp.Value = CType(False, Object) 
     Return inputColumn 
   End Function 

   Public  Overrides Sub PreExecute() 
     Dim input As IDTSInput90 = ComponentMetaData.InputCollection(0) 
     Dim inputColumns As IDTSInputColumnCollection90 = input.InputColumnCollection 
     Dim custProp As IDTSCustomProperty90 
     For Each column As IDTSInputColumn90 In inputColumns 
       custProp = column.CustomPropertyCollection("IsFileName") 
       If CType(custProp.Value, Boolean) = True Then 
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer) 
       End If 
       custProp = column.CustomPropertyCollection("IsBLOB") 
       If CType(custProp.Value, Boolean) = True Then 
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer) 
       End If 
     Next 
   End Sub 

   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer) 
     While buffer.NextRow 
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex) 
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer) 
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength) 
       strFileName = TranslateFileName(strFileName) 
       Dim fi As FileInfo = New FileInfo(strFileName) 
       ' Make sure directory exists before creating file.
       If Not fi.Directory.Exists Then 
         fi.Directory.Create 
       End If 
       ' Write the data to the file.
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None) 
       fs.Write(blobData, 0, blobLength) 
       fs.Close 
     End While 
   End Sub 

   Private Function TranslateFileName(ByVal fileName As String) As String 
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then 
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3) 
     Else 
       Return m_DestDir + fileName 
     End If 
   End Function 
 End Class 
End Namespace

参照

概念

カスタム変換元コンポーネントの開発
スクリプト コンポーネントによる変換先の作成

ヘルプおよび情報

SQL Server 2005 の参考資料の入手

変更履歴

リリース 履歴

2006 年 4 月 14 日

新しい内容 :
  • 書き込まれる行のパフォーマンス カウンタの可用性を説明しました。