操作方法: Lakehouse とノートブックのミラーリングさらた Azure Cosmos DB データに Microsoft Fabric からアクセスする (プレビュー)

このガイドでは、Lakehouse の ミラーリングされた Azure Cosmos DB データと Microsoft Fabric (プレビュー) からノートブックにアクセスする方法について説明します。

重要

Azure Cosmos DB のサポートは現在プレビュー段階です。 プレビュー期間中は、運用環境のワークロードはサポートされていません。 現在、Azure Cosmos DB for NoSQL アカウントのみでサポートされています。

前提条件

ヒント

パブリック プレビュー期間中は、バックアップから迅速に復旧できる既存の Azure Cosmos DB データのテスト コピーまたは開発コピーを使用することをお勧めします。

ミラーリングと前提条件をセットアップする

Azure Cosmos DB for NoSQL データベースのミラーリングを構成します。 ミラーリングの構成方法がわからない場合は、「ミラーデータベースの構成に関するチュートリアル」を参照してください。

  1. Fabric ポータルに移動します。

  2. Azure Cosmos DB アカウントの資格情報を使用して、新しい接続とミラーデータベースを作成します。

  3. レプリケーションがデータの初期スナップショットを完了するまで待ちます。

Lakehouse とノートブックのミラー データにアクセスする

Lakehouse を使用して、Azure Cosmos DB for NoSQL ミラーデータの分析に使用できるツールの数をさらに拡張します。 ここでは、Lakehouse を使用して Spark ノートブックを構築し、データのクエリを実行します。

  1. Fabric ポータルの [ホーム] に移動します。

  2. ナビゲーション メニューで [作成] を選択します。

  3. [作成] を選択し、[データ エンジニア] セクションを見つけて、[Lakehouse] を選択します。

  4. Lakehouse の名前を入力して、[作成] を選択します。

  5. 次に、[データの取得] を選択し、[新しいショートカット] を選択します。 ショートカット オプションの一覧から Microsoft OneLake を選択します。

  6. Fabric ワークスペース内のミラーされたデータベースの一覧から、ミラーリングされた Azure Cosmos DB for NoSQL データベースを選択します。 Lakehouse で使用するテーブルを選択し、[次へ] を選択して、[作成] を選択します。

  7. Lakehouse でテーブルのコンテキスト メニューを開き、[新規または既存のノートブック] を選択します。

  8. 新しいノートブックが自動的に開き、SELECT LIMIT 1000 を使用してデータフレームを読み込みます。

  9. SELECT * のように Spark を使用するなどのクエリを実行します。

    df = spark.sql("SELECT * FROM Lakehouse.OrdersDB_customers LIMIT 1000")
    display(df)
    

    ミラーされたデータベースからデータが事前に読み込まれている Lakehouse ノートブックのスクリーンショット。

    Note

    この例では、テーブルを想定しています。 Spark クエリを記述するときは、独自のテーブルを使用します。

Spark を使用して書き戻す

最後に、Spark と Python のコードを使用して、Fabric のノートブックからソース Azure Cosmos DB アカウントにデータを書き戻すことができます。 これを行って Cosmos DB に分析結果を書き戻し、OLTP アプリケーションのサービス プレーンとして使用できます。

  1. ノートブック内に 4 つのコード セルを作成します。

  2. 最初に、ミラー化されたデータに対してクエリを実行します。

    fMirror = spark.sql("SELECT * FROM Lakehouse1.OrdersDB_ordercatalog")
    

    ヒント

    これらのサンプル コード ブロックのテーブル名は、特定のデータ スキーマを前提としています。 これを独自のテーブル名と列名に置き換えてください。

  3. 次に、データを変換して集約します。

    dfCDB = dfMirror.filter(dfMirror.categoryId.isNotNull()).groupBy("categoryId").agg(max("price").alias("max_price"), max("id").alias("id"))
    
  4. 次に、資格情報、データベース名、コンテナー名を使用して Azure Cosmos DB for NoSQL アカウントに書き戻す Spark を構成します。

    writeConfig = {
      "spark.cosmos.accountEndpoint" : "https://xxxx.documents.azure.com:443/",
      "spark.cosmos.accountKey" : "xxxx",
      "spark.cosmos.database" : "xxxx",
      "spark.cosmos.container" : "xxxx"
    }
    
  5. 最後に、Spark を使用してソース データベースに書き戻します。

    dfCDB.write.mode("APPEND").format("cosmos.oltp").options(**writeConfig).save()
    
  6. すべてのコード セルを実行します。

    重要

    Azure Cosmos DB への書き込み操作では、要求ユニット (RU) が使用されます。