Tutorial: Conectar-se ao Azure Cosmos DB para NoSQL usando o Spark

APLICA-SE A: NoSQL

Neste tutorial, você usa o conector Azure Cosmos DB Spark para ler ou gravar dados de uma conta do Azure Cosmos DB para NoSQL. Este tutorial usa o Azure Databricks e um bloco de anotações Jupyter para ilustrar como integrar com a API para NoSQL do Spark. Este tutorial se concentra em Python e Scala, embora você possa usar qualquer linguagem ou interface suportada pelo Spark.

Neste tutorial, irá aprender a:

  • Conecte-se a uma API para conta NoSQL usando o Spark e um bloco de anotações Jupyter.
  • Crie recursos de banco de dados e contêiner.
  • Ingerir dados para o recipiente.
  • Consultar dados no contêiner.
  • Execute operações comuns em itens no contêiner.

Pré-requisitos

Conecte-se usando o Spark e o Jupyter

Use seu espaço de trabalho existente do Azure Databricks para criar um cluster de computação pronto para usar o Apache Spark 3.4.x para se conectar à sua conta do Azure Cosmos DB para NoSQL.

  1. Abra seu espaço de trabalho do Azure Databricks.

  2. Na interface do espaço de trabalho, crie um novo cluster. Configure o cluster com estas configurações, no mínimo:

    Versão Value
    Versão em tempo de execução 13,3 LTS (Scala 2.12, Faísca 3.4.1)
  3. Use a interface do espaço de trabalho para procurar pacotes Maven do Maven Central com uma ID de Grupo de com.azure.cosmos.spark. Instale o pacote especificamente para o Spark 3.4 com uma ID de artefato prefixada com azure-cosmos-spark_3-4 o cluster.

  4. Por fim, crie um novo bloco de anotações.

    Gorjeta

    Por padrão, o bloco de anotações é anexado ao cluster criado recentemente.

  5. No bloco de anotações, defina as definições de configuração OLTP (processamento de transações online) para o ponto de extremidade da conta NoSQL, o nome do banco de dados e o nome do contêiner.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Criar uma base de dados e um contentor

Use a API de catálogo para gerenciar recursos de conta, como bancos de dados e contêineres. Em seguida, você pode usar OLTP para gerenciar dados dentro dos recursos de contêiner.

  1. Configure a API de catálogo para gerenciar a API para recursos NoSQL usando o Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Crie um novo banco de dados nomeado cosmicworks usando CREATE DATABASE IF NOT EXISTSo .

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Crie um novo contêiner nomeado products usando CREATE TABLE IF NOT EXISTS. Certifique-se de definir o caminho da chave de partição e /category habilitar a taxa de transferência de dimensionamento automático com uma taxa de transferência máxima de unidades de 1000 solicitação (RUs) por segundo.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Crie outro contêiner nomeado employees usando uma configuração de chave de partição hierárquica. Use /organization, /departmente como /team o conjunto de caminhos de chave de partição. Siga essa ordem específica. Além disso, defina a taxa de transferência para uma quantidade manual de 400 RUs.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Execute as células do bloco de anotações para validar se o banco de dados e os contêineres foram criados na API para a conta NoSQL.

Ingerir dados

Crie um conjunto de dados de exemplo. Em seguida, use OLTP para ingerir esses dados para a API para contêiner NoSQL.

  1. Crie um conjunto de dados de exemplo.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Use spark.createDataFrame e a configuração OLTP salva anteriormente para adicionar dados de amostra ao contêiner de destino.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Consultar os dados

Carregue dados OLTP em um quadro de dados para executar consultas comuns nos dados. Você pode usar várias sintaxes para filtrar ou consultar dados.

  1. Use spark.read para carregar os dados OLTP em um objeto de quadro de dados. Use a mesma configuração usada anteriormente neste tutorial. Além disso, defina spark.cosmos.read.inferSchema.enabled como true para permitir que o conector Spark infera o esquema por amostragem de itens existentes.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Renderize o esquema dos dados carregados no quadro de dados usando printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Renderizar linhas de dados em que a quantity coluna é menor que 20. Use as where funções e show para executar essa consulta.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Renderizar a primeira linha de dados onde a clearance coluna é true. Use a filter função para executar essa consulta.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Renderize cinco linhas de dados sem filtro ou truncamento. Use a show função para personalizar a aparência e o número de linhas que são renderizadas.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Consulte seus dados usando esta cadeia de caracteres de consulta NoSQL bruta: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Executar operações comuns

Ao trabalhar com API para dados NoSQL no Spark, você pode executar atualizações parciais ou trabalhar com dados como JSON bruto.

  1. Para executar uma atualização parcial de um item:

    1. Copie a variável de configuração existente config e modifique as propriedades na nova cópia. Especificamente, configure a estratégia de gravação como ItemPatch. Em seguida, desative o suporte em massa. Defina as colunas e as operações mapeadas. Finalmente, defina o tipo de operação padrão como Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Crie variáveis para a chave de partição do item e o identificador exclusivo que você pretende direcionar como parte desta operação de patch.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Crie um conjunto de objetos de patch para especificar o item de destino e especificar campos que devem ser modificados.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Crie um quadro de dados usando o conjunto de objetos de patch. Use write para executar a operação de patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Execute uma consulta para revisar os resultados da operação de patch. O item agora deve ser nomeado Yamba New Surfboard sem outras alterações.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Para trabalhar com dados JSON brutos:

    1. Copie a variável de configuração existente config e modifique as propriedades na nova cópia. Especificamente, altere o contêiner de destino para employees. Em seguida, configure a contacts coluna/campo para usar dados JSON brutos.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Crie um conjunto de funcionários para ingerir no contêiner.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Crie um quadro de dados e use write para ingerir os dados do funcionário.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Renderize os dados do quadro de dados usando showo . Observe que a contacts coluna é JSON bruto na saída.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Próximo passo