Esercitazione: Connettersi ad Azure Cosmos DB per NoSQL usando Spark
SI APPLICA A: NoSQL
In questa esercitazione si usa il connettore Spark di Azure Cosmos DB per leggere o scrivere dati da un account Azure Cosmos DB per NoSQL. Questa esercitazione usa Azure Databricks e un notebook Jupyter per illustrare come eseguire l'integrazione con l'API per NoSQL da Spark. Questa esercitazione è incentrata su Python e Scala, anche se è possibile usare qualsiasi linguaggio o interfaccia supportata da Spark.
In questa esercitazione apprenderai a:
- Connettersi a un account API per NoSQL usando Spark e un notebook jupyter.
- Creare risorse di database e contenitore.
- Inserire dati nel contenitore.
- Eseguire query sui dati nel contenitore.
- Eseguire operazioni comuni sugli elementi nel contenitore.
Prerequisiti
- Un account Azure Cosmos DB for NoSQL già presente.
- Se si ha già una sottoscrizione di Azure, creare un nuovo account.
- Se non si ha una sottoscrizione di Azure, è possibile provare Azure Cosmos DB gratuitamente senza necessità di carta di credito.
- Un'area di lavoro di Azure Databricks esistente.
Connettersi con Spark e Jupyter
Usare l'area di lavoro di Azure Databricks esistente per creare un cluster di calcolo pronto per usare Apache Spark 3.4.x per connettersi all'account Azure Cosmos DB per NoSQL.
Aprire l'area di lavoro di Azure Databricks.
Nell'interfaccia dell'area di lavoro creare un nuovo cluster. Configurare il cluster con queste impostazioni, almeno:
Versione Valore Versione di runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Usare l'interfaccia dell'area di lavoro per cercare i pacchetti Maven da Maven Central con un ID gruppo di
com.azure.cosmos.spark
. Installare il pacchetto in modo specifico per Spark 3.4 con un ID artefatto precedutoazure-cosmos-spark_3-4
dal prefisso nel cluster.Infine, creare un nuovo notebook.
Suggerimento
Per impostazione predefinita, il notebook è collegato al cluster creato di recente.
Nel notebook impostare le impostazioni di configurazione OLTP (Online Transaction Processing) per l'endpoint dell'account NoSQL, il nome del database e il nome del contenitore.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Creare un database e un contenitore
Usare l'API catalogo per gestire le risorse dell'account, ad esempio database e contenitori. È quindi possibile usare OLTP per gestire i dati all'interno delle risorse del contenitore.
Configurare l'API catalogo per gestire le risorse API per NoSQL usando Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Creare un nuovo database denominato
cosmicworks
usandoCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Creare un nuovo contenitore denominato
products
usandoCREATE TABLE IF NOT EXISTS
. Assicurarsi di impostare il percorso della chiave di partizione su/category
e abilitare la velocità effettiva di scalabilità automatica con una velocità effettiva massima di1000
unità richiesta (UR) al secondo.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Creare un altro contenitore denominato
employees
usando una configurazione gerarchica della chiave di partizione. Usare/organization
,/department
e/team
come set di percorsi di chiave di partizione. Seguire l'ordine specifico. Impostare anche la velocità effettiva su una quantità manuale di400
UR.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Eseguire le celle del notebook per verificare che il database e i contenitori vengano creati all'interno dell'account API per NoSQL.
Inserire i dati
Creare un set di dati di esempio. Usare quindi OLTP per inserire tali dati nel contenitore API per NoSQL.
Creare un set di dati di esempio.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Usare
spark.createDataFrame
e la configurazione OLTP salvata in precedenza per aggiungere dati di esempio al contenitore di destinazione.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Eseguire query sui dati
Caricare dati OLTP in un frame di dati per eseguire query comuni sui dati. È possibile usare varie sintassi per filtrare o eseguire query sui dati.
Usare
spark.read
per caricare i dati OLTP in un oggetto frame di dati. Usare la stessa configurazione usata in precedenza in questa esercitazione. Impostare anche suspark.cosmos.read.inferSchema.enabled
true
per consentire al connettore Spark di dedurre lo schema eseguendo il campionamento degli elementi esistenti.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Eseguire il rendering dello schema dei dati caricati nel frame di dati usando
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Eseguire il rendering delle righe di dati in cui la colonna
quantity
è minore di20
. Usare le funzioniwhere
eshow
per eseguire questa query.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Eseguire il rendering della prima riga di dati in cui la
clearance
colonna ètrue
. Usare la funzionefilter
per eseguire questa query.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Eseguire il rendering di cinque righe di dati senza filtro o troncamento. Usare la funzione
show
per personalizzare l'aspetto e il numero di righe di cui viene eseguito il rendering.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Eseguire query sui dati usando questa stringa di query NoSQL non elaborata:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Eseguire operazioni comuni
Quando si usa l'API per i dati NoSQL in Spark, è possibile eseguire aggiornamenti parziali o usare i dati come JSON non elaborato.
Per eseguire un aggiornamento parziale di un elemento:
Copiare la variabile di configurazione esistente
config
e modificare le proprietà nella nuova copia. In particolare, configurare la strategia di scrittura inItemPatch
. Disabilitare quindi il supporto in blocco. Impostare le colonne e le operazioni mappate. Impostare infine il tipo di operazione predefinito suSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Creare variabili per la chiave di partizione dell'elemento e l'identificatore univoco di destinazione come parte di questa operazione di patch.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Creare un set di oggetti patch per specificare l'elemento di destinazione e specificare i campi da modificare.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Creare un frame di dati usando il set di oggetti patch. Usare
write
per eseguire l'operazione di patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Eseguire una query per esaminare i risultati dell'operazione di patch. L'elemento dovrebbe ora essere denominato
Yamba New Surfboard
senza altre modifiche.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Per usare dati JSON non elaborati:
Copiare la variabile di configurazione esistente
config
e modificare le proprietà nella nuova copia. In particolare, modificare il contenitore di destinazione inemployees
. Configurare quindi la colonna/campo per l'usocontacts
di dati JSON non elaborati.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Creare un set di dipendenti da inserire nel contenitore.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Creare un frame di dati e usare
write
per inserire i dati dei dipendenti.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Eseguire il rendering dei dati dal frame di dati usando
show
. Osservare che la colonnacontacts
è un JSON non elaborato nell'output.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenuto correlato
- Apache Spark
- API catalogo di Azure Cosmos DB
- Informazioni di riferimento sul parametro di configurazione
- Esempi di connettori Spark di Azure Cosmos DB
- Eseguire la migrazione da Spark 2.4 a Spark 3.*
- Compatibilità delle versioni:
- Note sulla versione:
- Collegamenti per il download: