你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将服务主体与适用于 Azure Cosmos DB for NoSQL 的 Spark 3 连接器配合使用

在本文中,你将了解如何创建可以与基于角色的访问控制配合使用的 Microsoft Entra 应用程序和服务主体。 然后,你可以使用此服务主体从 Spark 3 连接到 Azure Cosmos DB for NoSQL 帐户。

先决条件

创建机密和记录凭据

在本部分中,你将创建一个客户端密码,并记录该值以备后用。

  1. 打开 Azure 门户

  2. 转到现有的 Microsoft Entra 应用程序。

  3. 前往“证书和机密”页。 然后,创建新机密。 保存“客户端密码”值,以便稍后在本文中使用。

  4. 转至“概述”页面。 找到并记录“应用程序(客户端)ID”、“对象 ID”和“目录(租户)ID”的值。 本文稍后会使用这些值。

  5. 转到现有的 Azure Cosmos DB for NoSQL 帐户。

  6. 记录“概述”页上的“URI”值。 另请记录“订阅 ID”和“资源组”值。 本文稍后将使用这些值。

创建定义和分配

在本部分中,你将创建 Microsoft Entra ID 角色定义。 然后,分配具有读取和写入容器中项的权限的角色。

  1. 使用 az role definition create 命令创建角色。 传入 Azure Cosmos DB for NoSQL 帐户名和资源组,然后是用于定义自定义角色的 JSON 主体。 使用 / 将角色的范围限定为帐户级别。 确保使用请求正文的 RoleName 属性为角色提供唯一名称。

    az cosmosdb sql role definition create \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>" \
        --body '{
            "RoleName": "<role-definition-name>",
            "Type": "CustomRole",
            "AssignableScopes": ["/"],
            "Permissions": [{
                "DataActions": [
                    "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                    "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                    "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
                ]
            }]
        }'
    
  2. 列出你创建的角色定义,以在 JSON 输出中提取其唯一标识符。 记录 JSON 输出的 id 值。

    az cosmosdb sql role definition list \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>"
    
    [
      {
        ...,
        "id": "/subscriptions/<subscription-id>/resourceGroups/<resource-grou-name>/providers/Microsoft.DocumentDB/databaseAccounts/<account-name>/sqlRoleDefinitions/<role-definition-id>",
        ...
        "permissions": [
          {
            "dataActions": [
              "Microsoft.DocumentDB/databaseAccounts/readMetadata",
              "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
              "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
            ],
            "notDataActions": []
          }
        ],
        ...
      }
    ]
    
  3. 使用 az cosmosdb sql role assignment create 创建角色分配。 将“<aad-principal-id>”替换为在本文前面记录的“对象 ID”。 此外,将“<role-definition-id>”替换为在上文步骤中通过运行 az cosmosdb sql role definition list 命令获取的“id”值。

    az cosmosdb sql role assignment create \
        --resource-group "<resource-group-name>" \
        --account-name "<account-name>" \
        --scope "/" \
        --principal-id "<account-name>" \
        --role-definition-id "<role-definition-id>"
    

使用服务主体

现在,你已经创建了一个 Microsoft Entra 应用程序和服务主体,创建了一个自定义角色,并为该角色分配了 Azure Cosmos DB for NoSQL 帐户,你应该能够运行你的笔记本。

  1. 打开 Azure Databricks 工作区。

  2. 在工作区界面中,创建新的群集。 至少使用以下设置配置群集:

    版本
    运行时版本 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. 使用工作区界面从 Maven Central 搜索组 ID 为 com.azure.cosmos.spark 的 Maven 包。 将特定于 Spark 3.4 且项目 ID 前缀为 azure-cosmos-spark_3-4 的包安装到群集

  4. 最后,创建新的笔记本

    提示

    默认情况下,笔记本会附加到最近创建的群集。

  5. 在笔记本中,设置 NoSQL 帐户终结点、数据库名称和容器名称的 Azure Cosmos DB Spark 连接器配置设置。 使用在本文前面记录的“订阅 ID”、“资源组”、“应用程序(客户端)ID”、“目录(租户)ID”和“客户端密码”值。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.auth.type": "ServicePrincipal",
      "spark.cosmos.account.subscriptionId": "<subscription-id>",
      "spark.cosmos.account.resourceGroupName": "<resource-group-name>",
      "spark.cosmos.account.tenantId": "<entra-tenant-id>",
      "spark.cosmos.auth.aad.clientId": "<entra-app-client-id>",
      "spark.cosmos.auth.aad.clientSecret": "<entra-app-client-secret>",
      "spark.cosmos.database": "<database-name>",
      "spark.cosmos.container": "<container-name>"        
    }    
    
    // Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.auth.type" -> "ServicePrincipal",
      "spark.cosmos.account.subscriptionId" -> "<subscription-id>",
      "spark.cosmos.account.resourceGroupName" -> "<resource-group-name>",
      "spark.cosmos.account.tenantId" -> "<entra-tenant-id>",
      "spark.cosmos.auth.aad.clientId" -> "<entra-app-client-id>",
      "spark.cosmos.auth.aad.clientSecret" -> "<entra-app-client-secret>",
      "spark.cosmos.database" -> "<database-name>",
      "spark.cosmos.container" -> "<container-name>" 
    )
    
  6. 配置目录 API 以使用 Spark 来管理 API for NoSQL 资源。

    # Configure Catalog Api
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
    
    // Configure Catalog Api
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", "<nosql-account-endpoint>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "ServicePrincipal")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", "<subscription-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", "<resource-group-name>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", "<entra-tenant-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", "<entra-app-client-id>")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", "<entra-app-client-secret>")
    
  7. 使用 CREATE DATABASE IF NOT EXISTS 创建新的数据库。 确保提供数据库名称。

    # Create a database using the Catalog API
    spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format("<database-name>"))
    
    // Create a database using the Catalog API
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.<database-name>;")
    
  8. 使用指定的数据库名称、容器名称、分区键路径和吞吐量值创建新容器。

    # Create a products container using the Catalog API
    spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '{}', manualThroughput = '{}')".format("<database-name>", "<container-name>", "<partition-key-path>", "<throughput>"))
    
    // Create a products container using the Catalog API
    spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.<database-name>.<container-name> using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '<partition-key-path>', manualThroughput = '<throughput>')")
    
  9. 创建示例数据集。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  10. 使用 spark.createDataFrame 和以前保存的联机事务处理 (OLTP) 配置将示例数据添加到目标容器。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

    提示

    在此快速入门示例中,凭据以明文形式分配给变量。 但出于安全考虑,我们建议使用机密。 要详细了解如何配置机密,请参阅将机密添加到 Spark 配置中