Azure Cosmos DB for NoSQL の Spark 3 コネクタでサービス プリンシパルを使う

この記事では、ロールベースのアクセス制御で使用できる、Microsoft Entra のアプリケーションとサービス プリンシパルを作成する方法について説明します。 その後、このサービス プリンシパルを使って、Spark 3 から Azure Cosmos DB for NoSQL アカウントに接続できるようになります。

前提条件

シークレットを作成して資格情報を記録する

このセクションでは、クライアント シークレットを作成し、後で使用できるように値を記録します。

  1. Azure Portalを開きます。

  2. 既存の Microsoft Entra アプリケーションに移動します。

  3. [証明書とシークレット] ページに移動します。 次に新しいシークレットを作成します。 この記事の後半で使うために、[クライアント シークレット] の値を保存します。

  4. [概要] ページに移動します。 [アプリケーション (クライアント) ID][オブジェクト ID][ディレクトリ (テナント) ID] の値を見つけて記録します。 これらの値は、この記事の後半でも使います。

  5. 既存の Azure Cosmos DB for NoSQL アカウントに移動します。

  6. [概要] ページの [URI] 値を記録します。 また、[サブスクリプション ID][リソース グループ] の値も記録します。 これらの値は、この記事で後ほど使用します。

定義と割り当てを作成する

このセクションでは、Microsoft Entra ID ロール定義を作成します。 次に、コンテナー内の項目の読み取りと書き込みを行うアクセス許可を持つロールを割り当てます。

  1. az role definition create コマンドを使用して、ロールを作成します。 Azure Cosmos DB for NoSQL のアカウント名とリソース グループを渡し、その後にカスタム ロールを定義する JSON の本文を渡します。 ロールのスコープも / を使用してアカウント レベルに設定されます。 ロールには、必ず要求本文の RoleName プロパティを使って一意の名前を付けます。

    az cosmosdb sql role definition create \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>" \
        --body '{
            "RoleName": "<role-definition-name>",
            "Type": "CustomRole",
            "AssignableScopes": ["/"],
            "Permissions": [{
                "DataActions": [
                    "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                    "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                    "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
                ]
            }]
        }'
    
  2. 作成したロール定義を列挙し、その一意の識別子を JSON 出力でフェッチします。 JSON 出力の id 値を記録します。

    az cosmosdb sql role definition list \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>"
    
    [
      {
        ...,
        "id": "/subscriptions/<subscription-id>/resourceGroups/<resource-grou-name>/providers/Microsoft.DocumentDB/databaseAccounts/<account-name>/sqlRoleDefinitions/<role-definition-id>",
        ...
        "permissions": [
          {
            "dataActions": [
              "Microsoft.DocumentDB/databaseAccounts/readMetadata",
              "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
              "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
            ],
            "notDataActions": []
          }
        ],
        ...
      }
    ]
    
  3. az cosmosdb sql role assignment create を使ってロールの割り当てを作成します。 <aad-principal-id> を、この記事の前半で記録した [オブジェクト ID] に置き換えます。 また、<role-definition-id> を、前の手順で az cosmosdb sql role definition list コマンドを実行してフェッチした id 値に置き換えます。

    az cosmosdb sql role assignment create \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>" \
        --scope "/" \
        --principal-id "<account-name>" \
        --role-definition-id "<role-definition-id>"
    

サービス プリンシパルを使用する

Microsoft Entra アプリケーションとサービス プリンシパルを作成し、カスタム ロールを作成し、そのロールに Azure Cosmos DB for NoSQL アカウントへのアクセス許可を割り当てたので、ノートブックを実行できるようになります。

  1. Azure Databricks ワークスペースを開きます。

  2. ワークスペースのインターフェイスで、新しいクラスターを作成します。 少なくとも次の設定でクラスターを構成します。

    バージョン Value
    ランタイムのバージョン 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. ワークスペースのインターフェイスを使って、グループ IDcom.azure.cosmos.spark である Maven パッケージを Maven Central で検索します。 成果物 ID の前に azure-cosmos-spark_3-4 が付いている Spark 3.4 固有のパッケージを、クラスターにインストールします。

  4. 最後に、新しいノートブックを作成します。

    ヒント

    既定では、ノートブックは最近作成されたクラスターにアタッチされます。

  5. ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名に関する Azure Cosmos DB Spark コネクタ構成設定を設定します。 この記事の前半で記録した [サブスクリプション ID][リソース グループ][アプリケーション (クライアント) ID][ディレクトリ (テナント) ID][クライアント シークレット] の値を使います。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.auth.type": "ServicePrincipal",
      "spark.cosmos.account.subscriptionId": "<subscription-id>",
      "spark.cosmos.account.resourceGroupName": "<resource-group-name>",
      "spark.cosmos.account.tenantId": "<entra-tenant-id>",
      "spark.cosmos.auth.aad.clientId": "<entra-app-client-id>",
      "spark.cosmos.auth.aad.clientSecret": "<entra-app-client-secret>",
      "spark.cosmos.database": "<database-name>",
      "spark.cosmos.container": "<container-name>"        
    }    
    
    // Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.auth.type" -> "ServicePrincipal",
      "spark.cosmos.account.subscriptionId" -> "<subscription-id>",
      "spark.cosmos.account.resourceGroupName" -> "<resource-group-name>",
      "spark.cosmos.account.tenantId" -> "<entra-tenant-id>",
      "spark.cosmos.auth.aad.clientId" -> "<entra-app-client-id>",
      "spark.cosmos.auth.aad.clientSecret" -> "<entra-app-client-secret>",
      "spark.cosmos.database" -> "<database-name>",
      "spark.cosmos.container" -> "<container-name>" 
    )
    
  6. Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。

    # Configure Catalog Api
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
    
    // Configure Catalog Api
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
    
  7. CREATE DATABASE IF NOT EXISTS を使用して新しいデータベースを作成します。 データベース名を必ず指定してください。

    # Create a database using the Catalog API
    spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format("<database-name>"))
    
    // Create a database using the Catalog API
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.<database-name>;")
    
  8. 指定したデータベース名、コンテナー名、パーティション キー パス、スループットの値を使って、新しいコンテナーを作成します。

    # Create a products container using the Catalog API
    spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '{}', manualThroughput = '{}')".format("<database-name>", "<container-name>", "<partition-key-path>", "<throughput>"))
    
    // Create a products container using the Catalog API
    spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.<database-name>.<container-name> using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '<partition-key-path>', manualThroughput = '<throughput>')")
    
  9. サンプル データセットを作成します。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  10. spark.createDataFrame と以前に保存したオンライン トランザクション処理 (OLTP) 構成を使用して、ターゲット コンテナーにサンプル データを追加します。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

    ヒント

    このクイック スタートの例では、資格情報がクリア テキストで変数に割り当てられます。 セキュリティを確保するために、シークレットを使用することをお勧めします。 シークレットの構成の詳細については、Spark 構成にシークレットを追加する方法に関する記事を参照してください。