Esercitazione: Connettersi ad Azure Cosmos DB per NoSQL usando Spark

SI APPLICA A: NoSQL

In questa esercitazione si usa il connettore Spark di Azure Cosmos DB per leggere o scrivere dati da un account Azure Cosmos DB per NoSQL. Questa esercitazione usa Azure Databricks e un notebook Jupyter per illustrare come eseguire l'integrazione con l'API per NoSQL da Spark. Questa esercitazione è incentrata su Python e Scala, anche se è possibile usare qualsiasi linguaggio o interfaccia supportata da Spark.

In questa esercitazione apprenderai a:

  • Connettersi a un account API per NoSQL usando Spark e un notebook jupyter.
  • Creare risorse di database e contenitore.
  • Inserire dati nel contenitore.
  • Eseguire query sui dati nel contenitore.
  • Eseguire operazioni comuni sugli elementi nel contenitore.

Prerequisiti

  • Un account Azure Cosmos DB for NoSQL già presente.
  • Un'area di lavoro di Azure Databricks esistente.

Connettersi con Spark e Jupyter

Usare l'area di lavoro di Azure Databricks esistente per creare un cluster di calcolo pronto per usare Apache Spark 3.4.x per connettersi all'account Azure Cosmos DB per NoSQL.

  1. Aprire l'area di lavoro di Azure Databricks.

  2. Nell'interfaccia dell'area di lavoro creare un nuovo cluster. Configurare il cluster con queste impostazioni, almeno:

    Versione Valore
    Versione di runtime 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Usare l'interfaccia dell'area di lavoro per cercare i pacchetti Maven da Maven Central con un ID gruppo di com.azure.cosmos.spark. Installare il pacchetto in modo specifico per Spark 3.4 con un ID artefatto preceduto azure-cosmos-spark_3-4 dal prefisso nel cluster.

  4. Infine, creare un nuovo notebook.

    Suggerimento

    Per impostazione predefinita, il notebook è collegato al cluster creato di recente.

  5. Nel notebook impostare le impostazioni di configurazione OLTP (Online Transaction Processing) per l'endpoint dell'account NoSQL, il nome del database e il nome del contenitore.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Creare un database e un contenitore

Usare l'API catalogo per gestire le risorse dell'account, ad esempio database e contenitori. È quindi possibile usare OLTP per gestire i dati all'interno delle risorse del contenitore.

  1. Configurare l'API catalogo per gestire le risorse API per NoSQL usando Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Creare un nuovo database denominato cosmicworks usando CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Creare un nuovo contenitore denominato products usando CREATE TABLE IF NOT EXISTS. Assicurarsi di impostare il percorso della chiave di partizione su /category e abilitare la velocità effettiva di scalabilità automatica con una velocità effettiva massima di 1000 unità richiesta (UR) al secondo.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Creare un altro contenitore denominato employees usando una configurazione gerarchica della chiave di partizione. Usare /organization, /departmente /team come set di percorsi di chiave di partizione. Seguire l'ordine specifico. Impostare anche la velocità effettiva su una quantità manuale di 400 UR.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Eseguire le celle del notebook per verificare che il database e i contenitori vengano creati all'interno dell'account API per NoSQL.

Inserire i dati

Creare un set di dati di esempio. Usare quindi OLTP per inserire tali dati nel contenitore API per NoSQL.

  1. Creare un set di dati di esempio.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Usare spark.createDataFrame e la configurazione OLTP salvata in precedenza per aggiungere dati di esempio al contenitore di destinazione.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Eseguire query sui dati

Caricare dati OLTP in un frame di dati per eseguire query comuni sui dati. È possibile usare varie sintassi per filtrare o eseguire query sui dati.

  1. Usare spark.read per caricare i dati OLTP in un oggetto frame di dati. Usare la stessa configurazione usata in precedenza in questa esercitazione. Impostare anche su spark.cosmos.read.inferSchema.enabled true per consentire al connettore Spark di dedurre lo schema eseguendo il campionamento degli elementi esistenti.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Eseguire il rendering dello schema dei dati caricati nel frame di dati usando printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Eseguire il rendering delle righe di dati in cui la colonna quantity è minore di 20. Usare le funzioni where e show per eseguire questa query.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Eseguire il rendering della prima riga di dati in cui la clearance colonna è true. Usare la funzione filter per eseguire questa query.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Eseguire il rendering di cinque righe di dati senza filtro o troncamento. Usare la funzione show per personalizzare l'aspetto e il numero di righe di cui viene eseguito il rendering.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Eseguire query sui dati usando questa stringa di query NoSQL non elaborata: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Eseguire operazioni comuni

Quando si usa l'API per i dati NoSQL in Spark, è possibile eseguire aggiornamenti parziali o usare i dati come JSON non elaborato.

  1. Per eseguire un aggiornamento parziale di un elemento:

    1. Copiare la variabile di configurazione esistente config e modificare le proprietà nella nuova copia. In particolare, configurare la strategia di scrittura in ItemPatch. Disabilitare quindi il supporto in blocco. Impostare le colonne e le operazioni mappate. Impostare infine il tipo di operazione predefinito su Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Creare variabili per la chiave di partizione dell'elemento e l'identificatore univoco di destinazione come parte di questa operazione di patch.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Creare un set di oggetti patch per specificare l'elemento di destinazione e specificare i campi da modificare.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Creare un frame di dati usando il set di oggetti patch. Usare write per eseguire l'operazione di patch.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Eseguire una query per esaminare i risultati dell'operazione di patch. L'elemento dovrebbe ora essere denominato Yamba New Surfboard senza altre modifiche.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Per usare dati JSON non elaborati:

    1. Copiare la variabile di configurazione esistente config e modificare le proprietà nella nuova copia. In particolare, modificare il contenitore di destinazione in employees. Configurare quindi la colonna/campo per l'uso contacts di dati JSON non elaborati.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Creare un set di dipendenti da inserire nel contenitore.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Creare un frame di dati e usare write per inserire i dati dei dipendenti.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Eseguire il rendering dei dati dal frame di dati usando show. Osservare che la colonna contacts è un JSON non elaborato nell'output.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Passaggio successivo