データフローを作成する

重要

Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。

一般公開されたリリースが利用可能になった場合に、新しい Azure IoT Operations を表示する必要があります。プレビュー段階のインストールをアップグレードすることはできません。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

データフローとは、データがソースから宛先までたどるパスであり、必要に応じて変換が行われます。 データフローを構成するには、Azure IoT Operations ポータルを使用するか、Dataflow カスタム リソースを作成します。 データフローを作成する前に、データ ソースと宛先のデータフロー エンドポイントを構成する必要があります。

MQTT ソース エンドポイント、変換、Kafka 宛先エンドポイントを含むデータフロー構成の例を次に示します。

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: my-dataflow
spec:
  profileRef: my-dataflow-profile
  mode: enabled
  operations:
    - operationType: source
      name: my-source
      sourceSettings:
        endpointRef: mq
        dataSources:
          - thermostats/+/telemetry/temperature/#
          - humidifiers/+/telemetry/humidity/#
        serializationFormat: json
    - operationType: builtInTransformation
      name: my-transformation
      builtInTransformationSettings:
        filter:
          - inputs:
              - 'temperature.Value'
              - '"Tag 10".Value'
            expression: "$1*$2<100000"
        map:
          - inputs:
              - '*'
            output: '*'
          - inputs:
              - temperature.Value
            output: TemperatureF
            expression: cToF($1)
          - inputs:
              - '"Tag 10".Value'
            output: 'Tag 10'
        serializationFormat: json
    - operationType: destination
      name: my-destination
      destinationSettings:
        endpointRef: kafka
        dataDestination: factory
名前 説明
profileRef データフロー プロファイルへの参照
mode データフローのモード。 enabled または disabled
operations[] データフローによって実行される操作
operationType 操作の種類。 sourcedestination、または builtInTransformation

データフローの操作の種類を構成する方法については、以下のセクションを参照してください。

ソースの構成

データフローのソースを構成するには、エンドポイント参照とデータ ソースを指定します。 エンドポイントのデータ ソースの一覧を指定できます。 たとえば、MQTT や Kafka トピックです。 ソース エンドポイントとデータ ソースを含むデータフロー構成の例を次に示します。

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: mq-to-kafka
  namespace: azure-iot-operations
spec:
  profileRef: example-dataflow
  operations:
    - operationType: source
      sourceSettings:
        endpointRef: mq-source
        dataSources:
        - azure-iot-operations/data/thermostat
名前 説明
operationType ソース
sourceSettings source 操作の設定
sourceSettings.endpointRef source エンドポイントへの参照
sourceSettings.dataSources source 操作のデータ ソース。 ワイルドカード (#+) がサポートされています。

変換を構成する

変換操作では、宛先に送信する前にソースからのデータを変換できます。 変換は省略可能です。 データを変更する必要がない場合は、データフロー構成に変換操作を含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
        # ...
      filter:
        # ...
      map:
        # ...
名前 説明
operationType builtInTransformation
name 変換の名前
builtInTransformationSettings builtInTransformation 操作の設定
builtInTransformationSettings.datasets 一致するデータセットと条件を指定して、ソース データに他のデータを追加します
builtInTransformationSettings.filter 条件に基づいたデータのフィルター処理
builtInTransformationSettings.map 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します

エンリッチ: 参照データを追加する

データをエンリッチするには、Azure IoT Operations の分散状態ストア (DSS) の参照データセットを使用できます。 データセットは、条件に基づいてソース データにさらにデータを追加するために使用されます。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。

名前 説明
builtInTransformationSettings.datasets.key エンリッチメントに使用されるデータセット (DSS のキー)
builtInTransformationSettings.datasets.expression エンリッチメント操作の条件

たとえば、ソース データの deviceId フィールドを使用して、データセットの asset フィールドと一致させることができます。

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
      - key: assetDataset
        inputs:
          - $source.deviceId # ------------- $1
          - $context(assetDataset).asset # - $2
        expression: $1 == $2

データセットに asset フィールドを持つレコードがある場合は、次のようになります。

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

thermostat1 と一致する deviceId フィールドを持つソースのデータには、filtermap のステージで使用できる locationmanufacturer のフィールドがあります。

DSS セット ツール サンプルを使用して、サンプル データを分散状態ストア (DSS) に読み込むことができます。

条件構文の詳細については、「データフローを使用してデータをエンリッチする」とデータフローを使用したデータの変換に関する記事を参照してください。

フィルター: 条件に基づいてデータをフィルター処理する

条件に基づいてデータをフィルター処理するには、filter ステージを使用できます。 条件は、値と一致するソース データ内のフィールドとして指定されます。

名前 説明
builtInTransformationSettings.filter.inputs[] フィルター条件を評価するための入力
builtInTransformationSettings.filter.expression フィルター評価の条件

たとえば、ソース データの temperature フィールドを使用してデータをフィルター処理できます。

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      filter:
        - inputs:
          - temperature ? $last # - $1
          expression: "$1 > 20"

temperature フィールドが 20 より大きい場合、データは次のステージに渡されます。 temperature フィールドが 20 以下の場合、データはフィルター処理されます。

マップ: あるフィールドから別のフィールドにデータを移動する

省略可能な変換を使用してデータを別のフィールドにマップするには、map 操作を使用できます。 変換は、ソース データのフィールドを使用する数式として指定されます。

名前 説明
builtInTransformationSettings.map[].inputs[] マップ操作の入力
builtInTransformationSettings.map[].output マップ操作の出力フィールド
builtInTransformationSettings.map[].expression マップ操作の変換数式

たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、それを temperatureCelsius フィールドに格納できます。 コンテキスト化データセットの location フィールドを使用してソース データをエンリッチすることもできます。

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      map:
        - inputs:
          - temperature # - $1
          output: temperatureCelsius
          expression: "($1 - 32) * 5/9"
        - inputs:
          - $context(assetDataset).location  
          output: location

詳細については、「データフローを使用してデータをマッピングする」とデータフローを使用したデータの変換に関する記事を参照してください。

コピー先を構成する

データフローの宛先を構成するには、宛先のエンドポイントとパス (トピックまたはテーブル) を指定する必要があります。

名前 説明
destinationSettings.endpointRef destination エンドポイントへの参照
destinationSettings.dataDestination データの宛先

宛先エンドポイントの参照を構成する

宛先エンドポイントを構成するには、ID とエンドポイントの参照を指定する必要があります。

spec:
  operations:
  - operationType: destination
    name: destination1
    destinationSettings:
      endpointRef: eventgrid

宛先のパスを構成する

エンドポイントを取得したら、宛先のパスを構成できます。 宛先が MQTT または Kafka エンドポイントの場合は、パスを使用してトピックを指定します。

- operationType: destination
  destinationSettings:
    endpointRef: eventgrid
    dataDestination: factory

Microsoft Fabric などのストレージ エンドポイントの場合は、パスを使用してテーブル名を指定します。

- operationType: destination
  destinationSettings:
    endpointRef: adls
    dataDestination: telemetryTable