複数の入力を持つデータ フロー コンポーネントの開発

複数の入力によってデータが不均一なレートで生成される場合に、複数の入力があるデータ フローコンポーネントによって過度にメモリが消費されることがあります。 複数の入力をサポートするカスタム データ フロー コンポーネントを開発するときは、Microsoft.SqlServer.Dts.Pipeline 名前空間の次のメンバーを使用してこのメモリの負荷を管理できます。

  • DtsPipelineComponentAttribute クラスの DtsPipelineComponentAttribute.SupportsBackPressure プロパティ。 カスタム データ フロー コンポーネントに必要なコードを実装して不均一なレートのデータ フローを管理する場合は、このプロパティの値を true に設定します。

  • PipelineComponent クラスの IsInputReady メソッド。 SupportsBackPressure プロパティを true に設定する場合は、このメソッドを実装する必要があります。 実装しない場合、データ フロー エンジンは実行時に例外を発生させます。

  • PipelineComponent クラスの GetDependentInputs メソッド。 SupportsBackPressure プロパティを true に設定し、カスタム コンポーネントが複数の入力をサポートしている場合は、このメソッドも実装する必要があります。 実装しない場合、ユーザーが複数の入力をアタッチすると、データ フロー エンジンは実行時に例外を発生させます。

また、これらのメンバーを使用すると、マージ変換およびマージ結合変換用にマイクロソフトが開発したような、メモリ負荷のためのソリューションを開発できます。

SupportsBackPressure プロパティの設定

複数の入力をサポートするカスタム データ フロー コンポーネントで、より効率的なメモリ管理を実装するには、まず DtsPipelineComponentAttributeSupportsBackPressure プロパティの値を true に設定します。 SupportsBackPressure の値が true の場合、データ フロー エンジンは IsInputReady メソッドを呼び出し、複数の入力があるときは、実行時に GetDependentInputs メソッドも呼び出します。

次のコード例では、DtsPipelineComponentAttribute を実装して、SupportsBackPressure の値を true に設定します。

[DtsPipelineComponent(ComponentType = ComponentType.Transform,
        DisplayName = "Shuffler",
        Description = "Shuffle the rows from input.",
        SupportsBackPressure = true,
        LocalizationType = typeof(Localized),
        IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")
]
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent
        {
          ...
        }

IsInputReady メソッドの実装

DtsPipelineComponentAttribute オブジェクトの SupportsBackPressure プロパティの値を true に設定する場合は、PipelineComponent クラスの IsInputReady メソッドも実装する必要があります。

注意

IsInputReady メソッドの実装では、基本クラスで実装を呼び出す必要はありません。 基本クラスでのこのメソッドの既定の実装では、NotImplementedException を発生させるだけです。

このメソッドを実装し、コンポーネントの各入力に対して Boolean 型の canProcess 配列で要素の状態を設定します (入力は inputIDs 配列内の ID 値によって識別されます)。入力に対して canProcess 配列の要素の値を true に設定すると、データ フロー エンジンは、コンポーネントの ProcessInput メソッドを呼び出し、指定した入力に対して追加データを提供します。

アップストリーム データを追加で使用できますが、少なくとも 1 つの入力に対して canProcess 配列要素の値を常に true に設定する必要があります。設定しない場合は、処理が停止します。

データ フロー エンジンは、データの各バッファーを送信する前に IsInputReady メソッドを呼び出して、追加データの受信を待機している入力を判断します。 入力がブロックされていることが戻り値によって示された場合、データ フロー エンジンは、バッファーをコンポーネントに送信する代わりに、入力データの追加バッファーを一時的にキャッシュします。

注意

独自に記述するコード内で、IsInputReady または GetDependentInputs メソッドは呼び出しません。 データ フロー エンジンはコンポーネントを実行するとき、これらのメソッドやオーバーライドする PipelineComponent クラスのその他のメソッドを呼び出します。

次の例では、IsInputReady メソッドの実装は、以下の条件に該当する場合に、入力が追加データを受信するために待機していることを示します。

  • 入力に対する追加の上流データがある (!inputEOR)。

  • コンポーネントが、受信済みのバッファーで入力に対して処理できるデータを現在持っていない (inputBuffers[inputIndex].CurrentRow() == null)。

入力が追加のデータを待っている場合、データ フロー コンポーネントは、この入力に対応する canProcess 配列の要素の値を true に設定することによってこの状態を示します。

逆に、コンポーネントに入力を処理できるデータがある場合、この例は入力の処理を中断します。 この例では、入力に対応する canProcess 配列の要素の値を false に設定することによってこれを行います。

public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)
{
    for (int i = 0; i < inputIDs.Length; i++)
    {
        int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);

        canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)
            && !inputEOR[inputIndex];
    }
}

前の例は、Boolean 型の inputEOR 配列を使用して、各入力で追加のアップストリーム データを使用できるかどうかを示します。 配列名の EOR は "行セットの末尾 (end of rowset)" を示し、データ フロー バッファーの EndOfRowset プロパティを参照します。 ここには示してありませんが、この例では、ProcessInput メソッドが、受信するデータの各バッファーの EndOfRowset プロパティの値を確認します。 値 true によって、入力でこれ以上のアップストリーム データを利用できないことが示された場合、その入力の inputEOR 配列要素の値を true に設定します。 IsInputReady メソッドのこの例では、inputEOR 配列要素の値によって入力に使用できるアップストリーム データがないことが示された場合、canProcess 配列の入力に対応する要素の値を false に設定します。

GetDependentInputs メソッドの実装

カスタム データ フロー コンポーネントが複数の入力をサポートしている場合は、PipelineComponent クラスの GetDependentInputs メソッドも実装する必要があります。

注意

GetDependentInputs メソッドの実装では、基本クラスで実装を呼び出す必要はありません。 基本クラスでのこのメソッドの既定の実装では、NotImplementedException を発生させるだけです。

データ フロー エンジンは、ユーザーが複数の入力をコンポーネントにアタッチする場合にのみ、GetDependentInputs メソッドを呼び出します。 コンポーネントの入力が 2 つしかないときに 1 つの入力がブロックされていることが IsInputReady メソッドによって示される場合 (canProcess = false)、データ フロー エンジンは、もう一方の入力が追加のデータを待っていることを認識します。 ただし、入力が 2 つより多い場合に、IsInputReady メソッドによって 1 つの入力がブロックされていることが示されたときは、GetDependentInputs の追加のコードはどの入力が追加データの受信を待機しているかを示します。

注意

独自に記述するコード内で、IsInputReady または GetDependentInputs メソッドは呼び出しません。 データ フロー エンジンはコンポーネントを実行するとき、これらのメソッドやオーバーライドする PipelineComponent クラスのその他のメソッドを呼び出します。

ある 1 つの入力がブロックされている場合、GetDependentInputs メソッドの次の実装は、追加データの受信を待機している入力のコレクションを返し、はじめの 1 つの入力はブロッキング状態のままになります。 コンポーネントは、既に受信したバッファー内に、ブロックされている入力以外の入力が処理できるデータが現在ないことをチェックして、ブロックしている入力を特定します (inputBuffers[i].CurrentRow() == null)。 次に、GetDependentInputs メソッドは、ブロックしている入力のコレクションを入力 ID のコレクションとして返します。

        public override Collection<int> GetDependentInputs(int blockedInputID)
        {
            Collection<int> currentDependencies = new Collection<int>();
            for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)
            {
                if (ComponentMetaData.InputCollection[i].ID != blockedInputID
                    && inputBuffers[i].CurrentRow() == null)
                {
                    currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);
                }
            }
            
            return currentDependencies;
        }