データ プロセッサ パイプラインで最後の既知の値を使用する

重要

Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。

一般公開リリースが利用可能になった場合は、新しい Azure IoT Operations インストールをデプロイする必要があります。プレビュー インストールをアップグレードすることはできません。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

データ プロセッサ パイプライン内の最後の既知の値 (LKV) ステージを使用して、データの最新の完全なレコードを保守します。 LKV ステージでは、パイプライン内のメッセージに関するキーと値のペアの最新の値を追跡します。 その後このステージで、追跡対象の LKV 値を使用してメッセージをエンリッチできます。 最後の既知の値の追跡とエンリッチメントは、次のものに依存するダウンストリーム プロセスにとって重要です。

  • 特定の timestamp での複数の時系列のデータ ポイント。
  • 特定のキーの値が常にあるペイロード。

データ処理パイプラインでは、LKV ステージはオプションのステージです。 LKV ステージを使用すると、次のことができます。

  • パイプラインに複数の LKV ステージを追加します。 各 LKV ステージで、複数の値を追跡できます。
  • 保存された LKV 値を使用してメッセージをエンリッチし、データが完全で包括的な状態に保たれるようにします。
  • 受信メッセージからの最新の値を使用して LKV が自動的に更新される状態を維持します。
  • 論理パーティションごとに別個に LKV を追跡します。 LKV ステージは、各論理パーティション内で独立して動作します。
  • 追跡対象の LKV ごとに期限切れ日時を構成して、有効な状態に保たれる期間を管理します。 このように管理すると、古い値を使用してメッセージがエンリッチされないようにするのに役立ちます。

LKV ステージでは、時系列のデータ整合性が維持されます。 このステージでは、古い timestamp のメッセージで、新しい timestamp のメッセージを使用して LKV がオーバーライドされたり置き換えられたりしません。

LKV ステージは、追跡対象の最後の既知の値を使用して受信メッセージをエンリッチします。 これらのエンリッチされた値は以前に記録されたデータを表しており、必ずしも現在のリアルタイムの値であるとは限りません。 このビヘイビアーが想定されているデータ処理に合っていることを確認してください。

前提条件

集計パイプライン ステージを構成して使用するには、オプションのデータ プロセッサ コンポーネントを含むデータ プロセッサのデプロイされたインスタンスが必要です。

ステージの構成

LKV ステージの JSON 構成で、ステージの詳細を定義します。 ステージを作成するには、フォーム ベースの UI を操作するか、[詳細設定] タブで JSON 構成を指定します。

フィールド Description 必要 Default
名前 ステージのユーザー定義名。 はい - lkv1
説明 ステージのユーザー定義の説明。 いいえ - lkv1
プロパティ > の入力パス 追跡対象のキーのパス はい - .payload.temperature
プロパティ > の出力パス 出力メッセージ内の、LKV を書き込む場所へのパス はい - .payload.temperature_lkv
プロパティ > の有効期限 追跡対象のLKV はユーザー定義の期間のみ有効で、その後は出力メッセージは保存された値でエンリッチされません。 有効期限は LKV キーごとに追跡されます。 いいえ - 10h
プロパティ > の timestamp パス LKV が最後に更新された時点の timestamp を書き込む、出力メッセージ内の場所へのパス いいえ False -

timestamp パスを含めると、LKV が記録された時点を正確に把握し、透明性と追跡可能性を高めることができます。

inputPathoutputPath が等しい

送信メッセージは、実際のメッセージ値か、または追跡対象のキーがメッセージ ペイロード内にない場合は LKV の値です。 受信値が優先され、このステージで LKV を使用してオーバーライドされることはありません。 メッセージ値が LKV 値かどうかを識別するには、timestamp パスを使用します。 timestamp パスは、メッセージ内の値が追跡対象 LKV の場合のみ送信メッセージ内に含まれます。

inputPathoutputPath が等しくない

すべての受信メッセージについて、このステージで LKV が outputPath に書き込まれます。 この構成は、後続のメッセージ ペイロード内の値の違いを追跡するのに使用します。

サンプル構成

次の例は、LKV ステージのサンプル メッセージを示しています。このメッセージは 10:02 に着信し、ペイロードに追跡対象の LKV 値 .payload.temperature が含まれています。

{ 
  { 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

LKV 構成:

フィールド
入力パス* .payload.temperature
出力パス .payload.lkvtemperature
期限切れ日時 10h
timestamp パス .payload.lkvtemperature_timestamp

追跡対象の LKV 値は次のとおりです。

  • .payload.temperature は 250 です。
  • LKV の timestamp は 2023-01-11T10:02:07Z です

11:05 に着信し、ペイロードに温度のプロパティがないメッセージの場合、次のように LKV ステージは追跡対象の値を使用してメッセージをエンリッチします。

11:05 の時点の LKV ステージへの入力例:

{ 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T11:05:00Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "runningState": true 
    } 
} 

11:05 の時点の LKV ステージからの出力例:

{ 
    "systemProperties":{ 
        "partitionKey":"pump", 
        "partitionId":5, 
        "timestamp":"2023-01-11T11:05:00Z" 
    }, 
    "qos":1, 
    "topic":"/assets/pump/#" 
    }, 
    "payload":{ 
        "lkvtemperature":250, 
        "lkvtemperature_timestamp"":"2023-01-11T10:02:07Z" 
        "runningState": true 
    } 
}