Azure Cosmos DB から Azure Data Explorer にデータを取り込む

Azure Data Explorer では、変更フィードを使った Azure Cosmos DB for NoSQL からのデータ インジェストがサポートされます。 Cosmos DB の変更フィード データ接続は、Cosmos DB の変更フィードをリッスンし、データを Data Explorer テーブルに取り込むインジェスト パイプラインです。 変更フィードは、新しいドキュメントと更新されたドキュメントをリッスンしますが、削除はログに記録しません。 Azure Data Explorer でのデータ インジェストに関する一般的な情報については、「Azure Data Explorer のデータ インジェスト概要」を参照してください。

各データ接続は、特定の Cosmos DB コンテナーをリッスンし、指定されたテーブルにデータを取り込みます (1 つのテーブルに複数の接続を取り込むことができます)。 インジェスト方法として、ストリーミング インジェスト (有効な場合) とキューによるインジェストがサポートされます。

この記事では、Cosmos DB の変更フィード データ接続を設定して、システム マネージド ID を使用して Azure Data Explorer にデータを取り込む方法について説明します。 開始する前に、考慮事項を確認します。

コネクタを設定するには、次の手順を実行します。

手順 1: Azure Data Explorer テーブルを選択し、そのテーブル マッピングを構成する

手順 2: Cosmos DB データ接続を作成する

手順 3: データ接続をテストする

前提条件

手順 1: Azure Data Explorer テーブルを選択し、そのテーブル マッピングを構成する

データ接続を作成する前に、取り込まれたデータを格納し、ソース Cosmos DB コンテナー内のスキーマに一致するマッピングを適用するテーブルを作成します。 フィールドの単純なマッピング以上のものが必要なシナリオでは、更新ポリシーを使用して、変更フィードから取り込まれたデータを変換およびマップできます。

Cosmos DB コンテナー内の項目のサンプル スキーマを次に示します。

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

次の手順を使用してテーブルを作成し、テーブル マッピングを適用します。

  1. Azure Data Explorer Web UI で、左側のナビゲーション メニューから [クエリ] を選択し、テーブルを作成するデータベースを選択します。

  2. 次のコマンドを実行して、TestTable という名前のテーブルを作成します。

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. 次のコマンドを実行して、テーブル マッピングを作成します。

    このコマンドは、次のように、Cosmos DB JSON ドキュメントのカスタム プロパティを TestTable テーブル内の列にマップします。

    Cosmos DB プロパティ テーブル列 変換
    id Id なし
    name 名前 なし
    _ts _ts なし
    _ts _timestamp DateTimeFromUnixSeconds を使用して _ts (UNIX 秒) を _timestamp (datetime)) に変換します

    Note

    次のタイムスタンプ列を使用することをお勧めします。

    • _ts: この列を使用して、Cosmos DB でデータを調整します。
    • _timestamp: この列を使用して、Kusto クエリで効率的なタイム フィルターを実行します。 詳細については、「クエリのベスト プラクティス」を参照してください。
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

更新ポリシーを使用してデータを変換およびマップする

フィールドの単純なマッピング以上のものが必要なシナリオでは、更新ポリシーを使用して、変更フィードから取り込まれたデータを変換およびマップできます。

更新ポリシーは、テーブルに取り込まれるデータを変換するための 1 つの方法です。 これらは Kusto 照会言語で記述され、インジェスト パイプラインで実行されます。 これらは、次のシナリオのように、Cosmos DB の変更フィードの取り込みからデータを変換するために使用できます。

  • ドキュメントに含まれる配列が mv-expand 演算子を使用して複数の行で変換されていると、クエリがより実行しやすくなる場合。
  • ドキュメントをフィルターで除外する場合。 たとえば、where 演算子を使用して、種類別にドキュメントをフィルターで除外できます。
  • テーブル マッピングでは表現できない複雑なロジックがある場合。

更新ポリシーを作成および管理する方法については、「更新ポリシーの概要」を参照してください。

手順 2: Cosmos DB データ接続を作成する

次の方法を使用して、データ コネクタを作成できます。

  1. Azure portal でクラスターの概要ページに移動し、[作業の開始] タブを選択します。

  2. [データ インジェスト] タイルで、[データ接続を作成する]>[Cosmos DB] を選択します。

    [作業の開始] タブのスクリーンショット。Cosmos DB データ接続の作成オプションが表示されています。

  3. Cosmos DB の [データ接続を作成する] ウィンドウで、フォームに次の表の情報を入力します。

    データ接続ウィンドウのスクリーンショット。フォーム フィールドに値が表示されています。

    フィールド 説明
    データベース名 データを取り込む Azure Data Explorer データベースを選択します。
    データ接続名 データ接続の名前を指定します。
    サブスクリプション Cosmos DB NoSQL アカウントを含むサブスクリプションを選択します。
    Cosmos DB アカウント データの取り込み元となる Cosmos DB アカウントを選択します。
    SQL データベース データの取り込み元となる Cosmos DB データベースを選択します。
    SQL コンテナー データの取り込み元となる Cosmos DB コンテナーを選択します。
    テーブル名 データの取り込み先となる Azure Data Explorer のテーブル名を指定します。
    マッピング名 必要に応じて、データ接続に使用するマッピング名を指定します。
  4. 必要に応じて、[詳細設定] セクションで、次の操作を実行します。

    1. [イベント取得の開始日] を指定します。 これは、コネクタがデータの取り込みを開始する時刻です。 時刻を指定しない場合、コネクタはデータ接続を作成した時点からデータの取り込みを開始します。 推奨される日付形式は ISO 8601 UTC 標準で、次のように指定します: yyyy-MM-ddTHH:mm:ss.fffffffZ

    2. [ユーザー割り当て] を選択し、ID を選択します。 既定では、システム割り当てのマネージド ID が接続で使用されます。 必要に応じて、ユーザー割り当て の ID を使用できます。

      データ接続ウィンドウのスクリーンショット。[詳細設定] が表示されています。

  5. [作成] を選択して、データ接続を作成します。

手順 3: データ接続をテストする

  1. Cosmos DB コンテナーに、次のドキュメントを挿入します。

    {
        "name":"Cousteau"
    }
    
  2. Azure Data Explorer Web UI で、次のクエリを実行します。

    TestTable
    

    結果セットは次の図のようになります。

    取り込まれたドキュメントを示す結果ウィンドウのスクリーンショット。

Note

Azure Data Explorer には、インジェスト プロセスを最適化することを目的とした、キューに登録されたデータ インジェストの集計 (バッチ処理) ポリシーがあります。 既定のバッチ処理ポリシーは、バッチについて次の条件のいずれかが真になると、バッチを封印するように構成されています。最大遅延時間 5 分、合計サイズ 1 GB、または 1,000 個の BLOB。 そのため、待機時間が発生する可能性があります。 詳細については、バッチ処理ポリシーに関するページを参照してください。 待機時間を短縮するには、ストリーミングをサポートするようにテーブルを構成します。 ストリーミング ポリシーに関するページを参照してください。

考慮事項

Cosmos DB の変更フィードには、次の考慮事項が適用されます。

  • 変更フィードは削除イベントを公開しません。

    Cosmos DB の変更フィードには、新規および更新されたドキュメントのみが含まれます。 削除されたドキュメントについて知る必要がある場合は、ソフト マーカーを使って Cosmos DB ドキュメントを削除済みとしてマークするようにフィードを構成します。 ドキュメントが削除されたかどうかを示す更新イベントにプロパティが追加されます。 その後、クエリで where 演算子を使用してフィルターで除外できます。

    たとえば、削除されたプロパティを IsDeleted というテーブル列にマップした場合、次のクエリを使用して、削除されたドキュメントをフィルターで除外できます。

    TestTable
    | where not(IsDeleted)
    
  • 変更フィードは、ドキュメントの最新の更新のみを公開します。

    2 番目の考慮事項の影響を理解するには、次のシナリオを確認します。

    Cosmos DB コンテナーには、A および B のドキュメントが含まれています。foo というプロパティに対する変更を次の表に示します。

    ドキュメント ID プロパティ foo イベント ドキュメントのタイムスタンプ (_ts)
    A 作成 10
    B 作成 20
    A オレンジ 更新する 30
    A Pink 更新プログラム 40
    B Violet 更新する 50
    A Carmine 更新する 50
    B NeonBlue 更新する 70

    変更フィード API は、データ コネクタによって一定の間隔 (通常は数秒ごと) でポーリングされます。 各ポーリングには、呼び出しの間にコンテナーで発生した変更が含まれます (ただし、ドキュメントごとの最新バージョンの変更のみ)

    この問題を説明するために、次の表に示すように、タイムスタンプ 153555、および 75 の一連の API 呼び出しを検討します。

    API 呼び出しのタイムスタンプ ドキュメント ID プロパティ foo ドキュメントのタイムスタンプ (_ts)
    15 A 10
    35 B 20
    35 A オレンジ 30
    55 B Violet 50
    55 A Carmine 60
    75 B NeonBlue 70

    API の結果と Cosmos DB ドキュメントで行われた変更の一覧を比較すると、一致しないことがわかります。 タイムスタンプ 40 の変更テーブルで強調表示されているドキュメント A への更新イベントは、API 呼び出しの結果には表示されません。

    イベントが表示されない理由を理解するために、タイムスタンプ 35 と 55 の API 呼び出し間のドキュメント A の変更を調べます。 これら 2 つの呼び出しの間に、ドキュメント A は次のように 2 回変更されました。

    ドキュメント ID プロパティ foo イベント ドキュメントのタイムスタンプ (_ts)
    A ピンク 更新する 40
    A Carmine 更新する 50

    タイムスタンプ 55 で API 呼び出しが行われると、変更フィード API はドキュメントの最新バージョンを返します。 この場合、ドキュメント A の最新バージョンはタイムスタンプ 50 の更新です。これは、プロパティ foo での Pink から Carmine への更新です。

    このシナリオが原因で、データ コネクタは中間ドキュメントの変更を見落とす可能性があります。 たとえば、データ接続サービスが数分間ダウンしている場合や、ドキュメントの変更頻度が API ポーリング頻度よりも高い場合は、一部のイベントが見落とされることがあります。 ただし、各ドキュメントの最新の状態はキャプチャされます。

  • Cosmos DB コンテナーの削除と再作成はサポートされていません

    Azure Data Explorer では、フィード内の "位置" をチェックポイント処理することで、変更フィードを追跡します。 これは、コンテナーの各物理パーティションで継続トークンを使って行われます。 コンテナーが削除または再作成されると、これらの継続トークンは無効になり、リセットされません。データ接続を削除して再作成する必要があります。

コストを見積もる

Cosmos DB データ接続を使用すると、Cosmos DB コンテナーの要求ユニット (RU) の使用量にどの程度影響しますか?

コネクタは、コンテナーの各物理パーティションで Cosmos DB の変更フィード API を最大で 1 秒に 1 回呼び出します。 これらの呼び出しには、次のコストが関連します。

コスト 説明
固定費 固定コストは、物理パーティションごとに 1 秒あたり約 2 RU です。
変動費 変動コストは、ドキュメントの記述に使用される RU の約 2% ですが、シナリオによって異なる可能性があります。 たとえば、Cosmos DB コンテナーに 100 個のドキュメントを書き込む場合、それらのドキュメントを書き込むコストは 1,000 RU です。 これらのドキュメントを読み取るためにコネクタを使用するための対応するコストは、書き込みコストの約 2% (約 20 RU) です。