データ プロセッサの構成パターンとは
重要
Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。
一般公開リリースが利用可能になった場合は、新しい Azure IoT Operations インストールをデプロイする必要があります。プレビュー インストールをアップグレードすることはできません。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
いくつかの種類の構成は、複数のパイプライン ステージに共通です。 この記事では、パス、バッチ、テンプレート、再試行、継続時間の構成パターンについて説明します。
Path
いくつかのパイプライン ステージでは、データの読み取りまたは書き込みを行う必要があるメッセージ内の場所を示すために、パスを使います。 これらの場所を定義するには、jq 構文を使用する Path
フィールドを使います。
- パスは、UI 内の文字列として定義され、jq 構文を使います。
- パスは、データ プロセッサ メッセージのルートを基準にして定義されます。 パス
.
は、メッセージ全体を参照します。 - すべてのパスは
.
から始まります。 - パスの各セグメントには次のものを使用できます。
- 英数字オブジェクト キーの
.<identifier>
(例:.topic
)。 - 任意のオブジェクト キーの
."<identifier>"
(例:."asset-id"
)。 - 任意のオブジェクト キーの
["<identifier>"]
(例:["asset-id"]
)。 - 配列インデックスの
[<index>]
(例:[2]
)。
- 英数字オブジェクト キーの
- セグメントを連結して複雑なパスを作成できます。
個々のパス セグメントは、次の正規表現に準拠している必要があります。
パターン | 正規表現 | 例 |
---|---|---|
.<identifier> |
\.[a-zA-Z_][a-zA-Z0-9_]* |
.topic |
."<identifier>" |
\."(\\"\|[^"])*" |
."asset-id" |
["<identifier>"] |
\["(\\"\|[^"])*"\] |
["asset-id"] |
[<index>] |
\[-?[0-9]+\] |
[2] |
詳しくは、jq ガイドの「パス」をご覧ください。
パスの例
次のパスは、Data Processor の標準のメッセージ構造に基づく例です。
.systemProperties
が次のように返します。{ "partitionKey": "foo", "partitionId": 5, "timestamp": "2023-01-11T10:02:07Z" }
.payload.humidity
が次のように返します。10
.userProperties[0]
が次のように返します。{ "key": "prop1", "value": "value1" }
.userProperties[0].value
が次のように返します。value1
長さ
いくつかのステージでは、"期間" の継続時間を定義する必要があります。 たとえば、集計ステージでの対象範囲や、コールアウト ステージでのタイムアウトなどです。 これらのステージでは、Duration
フィールドを使ってこの値を表します。
いくつかのパイプライン ステージでは、"期間" の継続時間が使われます。 たとえば、集計ステージには windows プロパティがあり、コールアウト ステージには timeouts プロパティがあります。 これらの期間を定義するには、Duration
フィールドを使います。
Duration は、<number><char><number><char>
という形式の文字列値であり、`char`` では以下を指定できます。
h
: 時間m
: 分s
: 秒ms
: ミリ秒us
、µs
: マイクロ秒ns
: ナノ秒
Duration の例
2h
1h10m30s
3m9ms400ns
テンプレート
いくつかのステージでは、動的な値と静的な値の組み合わせで文字列を定義する必要があります。 このようなステージでは、"テンプレート" 値を使います。
データ プロセッサ テンプレートでは、 Mustache 構文 を使用して文字列内の動的な値を定義します。
テンプレートで使用できる動的なシステム値は次のとおりです。
instanceId
: Data Processor インスタンス ID。pipelineId
: ステージが含まれるパイプライン ID。YYYY
: 現在の年。MM
: 現在の月。DD
: 現在の日。HH
: 現在の時。mm
: 現在の分。
現時点では、ターゲット ステージでテンプレートを使ってファイル パスを定義できます。
静的フィールドと動的フィールド
一部のステージでは値を定義する必要があり、静的な文字列またはメッセージ内の Path
から導出される動的な値を使用できます。 これらの値を定義するには、"静的" または "動的" なフィールドを使用できます。
静的または動的なフィールドは常にオブジェクトとして記述し、その type
フィールドで static
または dynamic
のいずれかの値を指定します。 スキーマは type
によって異なります。
静的フィールド
静的な定義は、type
が static
の場合の固定値です。 実際の値は value
に保持されます。
フィールド | タイプ | 内容 | 必要 | Default | 例 |
---|---|---|---|---|---|
type | const 文字列 | フィールドの種類。 | はい | - | static |
value | 任意 | 構成に使用する静的な値 (通常は文字列)。 | はい | - | "static" |
次の例は、いくつかの静的フィールドの定義を示しています。
{
"some-field": {
"type": "static",
"value": "some-static-value"
}
}
{
"some-boolean-field": {
"type": "static",
"value": true
}
}
{
"some-complex-field": {
"type": "static",
"value": {
"some": [
"nested",
"data"
]
}
}
}
動的フィールド
type
の固定値は dynamic
です。 値は、jq path です。
フィールド | タイプ | 内容 | 必要 | Default | 例 |
---|---|---|---|---|---|
type | const 文字列 | フィールドの種類。 | はい | - | dynamic |
value | Path | フィールドの値を動的に取得できる各メッセージ内のパス。 | はい | - | .systemProperties.partitionKey |
動的フィールドの定義を示す例を以下に示します。
{
"some-field": {
"type": "dynamic",
"value": ".systemProperties.topic"
}
}
再試行
外部サービスを呼び出すステージでは、再試行を使用して一時的な障害を処理し、信頼性を向上させることができます。 ステージを構成するときに、既定の再試行ポリシーをオーバーライドできます。
指定できる再試行ポリシーは 4 つあります。
default
: 既定の再試行ポリシーは、3 回の再試行と 500 ミリ秒の初期再試行間隔でエクスポネンシャル バックオフ再試行を使用します。none
: 再試行は実行されません。fixed
: 固定再試行ポリシーは、各再試行の間隔を固定して、固定回数だけ再試行を行います。exponential
: エクスポネンシャル再試行ポリシーは、各再試行の間隔が指数関数的に増加させながら、固定回数の再試行を行います。
default
または none
を選択した場合は、それ以上構成を指定する必要はありません。 fixed
または exponential
を選択した場合は、さらに構成を指定する必要があります。
固定
フィールド | タイプ | 説明 | 必須 | 既定値 | 例 |
---|---|---|---|---|---|
type |
string | 再試行の種類の名前: fixed |
はい | 該当なし | fixed |
interval |
期間 | 各再試行間の初期時間。 | いいえ | 500ms |
2s |
maxRetries |
int | 再試行回数を 1 回から 20 回にします |
いいえ | 3 |
20 |
指数
フィールド | タイプ | 説明 | 必須 | 既定値 | 例 |
---|---|---|---|---|---|
type |
string | 再試行の種類の名前: exponential |
はい | 該当なし | exponential |
interval |
期間 | 各再試行間の初期時間。 | いいえ | 500ms |
2s |
maxRetries |
int | 再試行回数を 1 回から 20 回にします |
いいえ | 3 |
20 |
maxInterval |
期間 | 各再試行間の最大期間。 | いいえ | なし | 100s |
エクスポネンシャル再試行時間は、次のように計算されます。間隔が 500ms
で始まる場合、次の再試行間隔は randInt(500ms, 500ms * 2^1)
、randInt(500ms, 500ms * 2^2)
、...、randInt(500ms, 500ms * 2^maxRetries)
のようになります。
Batch
出力ステージのいくつかのターゲットでは、ターゲットに書き込む前にメッセージをバッチ処理できます。 これらのターゲットでは、batch を使ってバッチ処理パラメーターを定義します。
現時点では、日時に基づいて batch を定義できます。 バッチ処理を定義するには、期間とパスを指定する必要があります。 Path
では、バッチ処理された出力に含める受信メッセージの部分を定義します。
フィールド | タイプ | 内容 | 必要 | Default | 例 |
---|---|---|---|---|---|
時刻 | 期間 | データをバッチ処理する期間 | いいえ | 60s (バッチ処理が適用されるターゲットで) |
120s |
パス | Path | 出力に含める各メッセージ内の値へのパス。 | いいえ | .payload |
.payload.output |