データ プロセッサ パイプラインから Microsoft Fabric にデータを送信する

重要

Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

Fabric Lakehouse の変換先を使用して、Azure IoT Data Processor プレビュー パイプラインから Microsoft Fabric のレイクハウスにデータを書き込みます。 変換先ステージで Parquet ファイルが Lakehouse に書き込まれ、データをデルタ テーブルで閲覧できます。 変換先ステージは、メッセージを Microsoft Fabric に送信する前にバッチ処理します。

前提条件

Microsoft Fabric の変換先パイプライン ステージを構成して使用するには、次のものが必要です。

Microsoft Fabric をセットアップする

データ パイプラインから Microsoft Fabric に書き込む前に、パイプラインからレイクハウスへのアクセス権を付与する必要があります。 サービス プリンシパルまたはマネージド ID を使用して、パイプラインを認証できます。 マネージド ID を使用する利点は、サービス プリンシパルのライフサイクルを管理する必要がないことです。 マネージド ID は Azure によって自動的に管理され、割り当てられているリソースのライフサイクルに関連付けられます。

レイクハウスへのサービス プリンシパルまたはマネージド ID アクセスを構成する前に、サービス プリンシパル認証を有効にします。

クライアント シークレットを持つサービス プリンシパルを作成するには、次のようにします。

  1. サービス プリンシパルを作成するには、次の Azure CLI コマンドを使います。

    az ad sp create-for-rbac --name <YOUR_SP_NAME> 
    
  2. このコマンドの出力には、appIddisplayNamepasswordtenant が含まれます。 Microsoft Fabric などのクラウド リソースへのアクセスの構成、シークレットの作成、パイプラインの変換先の構成を行うときに使うので、これらの値を記録しておきます。

    {
        "appId": "<app-id>",
        "displayName": "<name>",
        "password": "<client-secret>",
        "tenant": "<tenant-id>"
    }
    

Microsoft Fabric ワークスペースにサービス プリンシパルを追加するには:

  1. ワークスペース ID と Lakehouse ID を書き留めます。 これらの値は、Lakehouse へのアクセスに使用する URL にあります。

    https://msit.powerbi.com/groups/<your workspace ID>/lakehouses/<your lakehouse ID>?experience=data-engineering

  2. ワークスペースで、[アクセス管理] を選択します。

    [アクセスの管理] リンクの場所を示すスクリーンショット。

  3. [ユーザーまたはグループの追加] を選択します。

    ユーザーを追加する方法を示すスクリーンショット。

  4. サービス プリンシパルを名前で検索します。 入力を開始すると、一致するサービス プリンシパルの一覧が表示されます。 以前に作成したサービス プリンシパルを選択します。

    サービス プリンシパルを追加する方法を示すスクリーンショット。

  5. ワークスペースへのサービス プリンシパル管理者アクセス権を付与します。

シークレットを構成する

変換先ステージが Microsoft Fabric に接続するようにするには、認証の詳細を含むシークレットにアクセスする必要があります。 シークレットを作成するには:

  1. 次のコマンドを使って、サービス プリンシパルの作成時に記録したクライアント シークレットを含むシークレットを Azure Key Vault に追加します。

    az keyvault secret set --vault-name <your-key-vault-name> --name AccessFabricSecret --value <client-secret>
    
  2. Azure IoT Operations Preview デプロイ用のシークレットを管理する」のステップに従って、Kubernetes クラスターへのシークレット参照を追加します。

変換先ステージを構成する

Fabric Lakehouse の変換先ステージの JSON 構成で、ステージの詳細を定義します。 ステージを作成するには、フォーム ベースの UI を使用するか、[詳細設定] タブで次の JSON 構成を指定します。

フィールド タイプ 内容 必要 Default
[表示名] String データ プロセッサ UI に表示する名前。 はい - Azure IoT MQ output
説明 String ステージの実行内容を示すわかりやすい説明。 いいえ Write to topic default/topic1
WorkspaceId String Lakehouse のワークスペース ID。 はい -
LakehouseId String Lakehouse の Lakehouse ID。 はい -
テーブル String 書き込むテーブルの名前。 はい -
ファイル パス1 [テンプレート] Parquet ファイルの書き込み先のファイル パス。 いいえ {{{instanceId}}}/{{{pipelineId}}}/{{{partitionId}}}/{{{YYYY}}}/{{{MM}}}/{{{DD}}}/{{{HH}}}/{{{mm}}}/{{{fileNumber}}}
バッチ2 Batch データをバッチ処理する方法。 いいえ 60s 10s
認証4 String Azure Data Explorer に接続する認証の詳細。 Service principal または Managed identity サービス プリンシパル はい -
再試行 再試行 使用する再試行ポリシー。 いいえ default fixed
列 > 名前 string 列の名前。 はい temperature
列 > 型3 string enum デルタ プリミティブ型のいずれかを使用して、列に保持されるデータの型。 はい integer
列 > パス Path 列の値を読み取るデータの各レコード内の場所。 いいえ .{{name}} .temperature

1ファイル パス: Microsoft Fabric にファイルを書き込むには、ファイル パスが必要です。 テンプレート を使用して、ファイル パスを構成できます。 ファイル パスには、次のコンポーネントを任意の順序で含む必要があります。

  • instanceId
  • pipelineId
  • partitionId
  • YYYY
  • MM
  • DD
  • HH
  • mm
  • fileNumber

ファイル名は、fileNumberで示される増分整数値です。 システムにファイルの種類を認識させる場合は、必ずファイル拡張子を含めるようにしてください。

2バッチ処理: データを Microsoft Fabric に書き込む場合はバッチ処理が必須です。 変換先ステージは、構成可能な時間間隔でメッセージをバッチ処理します。

バッチ処理間隔を構成しない場合、ステージでは既定として 60 秒が使用されます。

3型: データ プロセッサでは、デルタ形式を使用して Microsoft Fabric に書き込みます。 データ プロセッサは、decimaltimestamp without time zone を除くすべてのデルタ プリミティブ データ型をサポートします。

Microsoft Fabric ですべての日付と時刻が正しく表されるようにするには、プロパティの値が有効な RFC 3339 文字列であり、データ型が date または timestamp であることを確認します。

1認証: 現在、変換先ステージでは、Microsoft Fabric に接続するときにサービス プリンシパル ベースの認証またはマネージド ID がサポートされています。

サービス プリンシパル ベースの認証

サービス プリンシパル ベースの認証を構成するには、次の値を指定します。 サービス プリンシパルを作成し、クラスターにシークレット参照を追加したときに、これらの値をメモしました。

フィールド Description 必須
TenantId テナント ID。 はい
ClientId データベースへのアクセス権を持つサービス プリンシパルを作成するときに記録したアプリ ID。 はい
Secret クラスターで作成したシークレット参照。 はい

サンプル構成

次の JSON 例は、データベースの quickstart テーブルにメッセージ全体を書き込む Microsoft Fabric Lakehouse の 変換先ステージの完全な構成を示しています。

{
    "displayName": "Fabric Lakehouse - 520f54",
    "type": "output/fabric@v1",
    "viewOptions": {
        "position": {
            "x": 0,
            "y": 784
        }
    },
    "workspace": "workspaceId",
    "lakehouse": "lakehouseId",
    "table": "quickstart",
    "columns": [
        {
            "name": "Timestamp",
            "type": "timestamp",
            "path": ".Timestamp"
        },
        {
            "name": "AssetName",
            "type": "string",
            "path": ".assetname"
        },
        {
            "name": "Customer",
            "type": "string",
            "path": ".Customer"
        },
        {
            "name": "Batch",
            "type": "integer",
            "path": ".Batch"
        },
        {
            "name": "CurrentTemperature",
            "type": "float",
            "path": ".CurrentTemperature"
        },
        {
            "name": "LastKnownTemperature",
            "type": "float",
            "path": ".LastKnownTemperature"
        },
        {
            "name": "Pressure",
            "type": "float",
            "path": ".Pressure"
        },
        {
            "name": "IsSpare",
            "type": "boolean",
            "path": ".IsSpare"
        }
    ],
    "authentication": {
        "type": "servicePrincipal",
        "tenantId": "tenantId",
        "clientId": "clientId",
        "clientSecret": "secretReference"
    },
    "batch": {
        "time": "5s",
        "path": ".payload"
    },
    "retry": {
        "type": "fixed",
        "interval": "20s",
        "maxRetries": 4
    }
}

この構成は次のことを定義します。

  • メッセージは 5 秒間でバッチ処理されます。
  • バッチ パス .payload を使用して、列のデータを検索します。

次の例は、Microsoft Fabric の変換先ステージへの入力メッセージのサンプルを示しています:

{
  "payload": {
    "Batch": 102,
    "CurrentTemperature": 7109,
    "Customer": "Contoso",
    "Equipment": "Boiler",
    "IsSpare": true,
    "LastKnownTemperature": 7109,
    "Location": "Seattle",
    "Pressure": 7109,
    "Timestamp": "2023-08-10T00:54:58.6572007Z",
    "assetName": "oven"
  },
  "qos": 0,
  "systemProperties": {
    "partitionId": 0,
    "partitionKey": "quickstart",
    "timestamp": "2023-11-06T23:42:51.004Z"
  },
  "topic": "quickstart"
}