データフローを作成する
重要
Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。
一般公開されたリリースが利用可能になった場合に、新しい Azure IoT Operations を表示する必要があります。プレビュー段階のインストールをアップグレードすることはできません。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
データフローとは、データがソースから宛先までたどるパスであり、必要に応じて変換が行われます。 データフローを構成するには、Azure IoT Operations ポータルを使用するか、Dataflow カスタム リソースを作成します。 データフローを作成する前に、データ ソースと宛先のデータフロー エンドポイントを構成する必要があります。
MQTT ソース エンドポイント、変換、Kafka 宛先エンドポイントを含むデータフロー構成の例を次に示します。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
spec:
profileRef: my-dataflow-profile
mode: enabled
operations:
- operationType: source
name: my-source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
serializationFormat: json
- operationType: builtInTransformation
name: my-transformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
serializationFormat: json
- operationType: destination
name: my-destination
destinationSettings:
endpointRef: kafka
dataDestination: factory
名前 | 説明 |
---|---|
profileRef | データフロー プロファイルへの参照 |
mode | データフローのモード。 enabled または disabled |
operations[] | データフローによって実行される操作 |
operationType | 操作の種類。 source、destination、または builtInTransformation |
データフローの操作の種類を構成する方法については、以下のセクションを参照してください。
ソースの構成
データフローのソースを構成するには、エンドポイント参照とデータ ソースを指定します。 エンドポイントのデータ ソースの一覧を指定できます。 たとえば、MQTT や Kafka トピックです。 ソース エンドポイントとデータ ソースを含むデータフロー構成の例を次に示します。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: mq-to-kafka
namespace: azure-iot-operations
spec:
profileRef: example-dataflow
operations:
- operationType: source
sourceSettings:
endpointRef: mq-source
dataSources:
- azure-iot-operations/data/thermostat
名前 | 説明 |
---|---|
operationType | ソース |
sourceSettings | source 操作の設定 |
sourceSettings.endpointRef | source エンドポイントへの参照 |
sourceSettings.dataSources | source 操作のデータ ソース。 ワイルドカード (# と + ) がサポートされています。 |
変換を構成する
変換操作では、宛先に送信する前にソースからのデータを変換できます。 変換は省略可能です。 データを変更する必要がない場合は、データフロー構成に変換操作を含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
名前 | 説明 |
---|---|
operationType | builtInTransformation |
name | 変換の名前 |
builtInTransformationSettings | builtInTransformation 操作の設定 |
builtInTransformationSettings.datasets | 一致するデータセットと条件を指定して、ソース データに他のデータを追加します |
builtInTransformationSettings.filter | 条件に基づいたデータのフィルター処理 |
builtInTransformationSettings.map | 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します |
エンリッチ: 参照データを追加する
データをエンリッチするには、Azure IoT Operations の分散状態ストア (DSS) の参照データセットを使用できます。 データセットは、条件に基づいてソース データにさらにデータを追加するために使用されます。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。
名前 | 説明 |
---|---|
builtInTransformationSettings.datasets.key | エンリッチメントに使用されるデータセット (DSS のキー) |
builtInTransformationSettings.datasets.expression | エンリッチメント操作の条件 |
たとえば、ソース データの deviceId
フィールドを使用して、データセットの asset
フィールドと一致させることができます。
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
データセットに asset
フィールドを持つレコードがある場合は、次のようになります。
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
thermostat1
と一致する deviceId
フィールドを持つソースのデータには、filter
と map
のステージで使用できる location
と manufacturer
のフィールドがあります。
DSS セット ツール サンプルを使用して、サンプル データを分散状態ストア (DSS) に読み込むことができます。
条件構文の詳細については、「データフローを使用してデータをエンリッチする」とデータフローを使用したデータの変換に関する記事を参照してください。
フィルター: 条件に基づいてデータをフィルター処理する
条件に基づいてデータをフィルター処理するには、filter
ステージを使用できます。 条件は、値と一致するソース データ内のフィールドとして指定されます。
名前 | 説明 |
---|---|
builtInTransformationSettings.filter.inputs[] | フィルター条件を評価するための入力 |
builtInTransformationSettings.filter.expression | フィルター評価の条件 |
たとえば、ソース データの temperature
フィールドを使用してデータをフィルター処理できます。
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
temperature
フィールドが 20 より大きい場合、データは次のステージに渡されます。 temperature
フィールドが 20 以下の場合、データはフィルター処理されます。
マップ: あるフィールドから別のフィールドにデータを移動する
省略可能な変換を使用してデータを別のフィールドにマップするには、map
操作を使用できます。 変換は、ソース データのフィールドを使用する数式として指定されます。
名前 | 説明 |
---|---|
builtInTransformationSettings.map[].inputs[] | マップ操作の入力 |
builtInTransformationSettings.map[].output | マップ操作の出力フィールド |
builtInTransformationSettings.map[].expression | マップ操作の変換数式 |
たとえば、ソース データの temperature
フィールドを使用して温度を摂氏に変換し、それを temperatureCelsius
フィールドに格納できます。 コンテキスト化データセットの location
フィールドを使用してソース データをエンリッチすることもできます。
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
詳細については、「データフローを使用してデータをマッピングする」とデータフローを使用したデータの変換に関する記事を参照してください。
コピー先を構成する
データフローの宛先を構成するには、宛先のエンドポイントとパス (トピックまたはテーブル) を指定する必要があります。
名前 | 説明 |
---|---|
destinationSettings.endpointRef | destination エンドポイントへの参照 |
destinationSettings.dataDestination | データの宛先 |
宛先エンドポイントの参照を構成する
宛先エンドポイントを構成するには、ID とエンドポイントの参照を指定する必要があります。
spec:
operations:
- operationType: destination
name: destination1
destinationSettings:
endpointRef: eventgrid
宛先のパスを構成する
エンドポイントを取得したら、宛先のパスを構成できます。 宛先が MQTT または Kafka エンドポイントの場合は、パスを使用してトピックを指定します。
- operationType: destination
destinationSettings:
endpointRef: eventgrid
dataDestination: factory
Microsoft Fabric などのストレージ エンドポイントの場合は、パスを使用してテーブル名を指定します。
- operationType: destination
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable