Kurz: Připojení ke službě Azure Cosmos DB for NoSQL pomocí Sparku
PLATÍ PRO: NoSQL
V tomto kurzu použijete konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu dat z účtu Azure Cosmos DB for NoSQL. Tento kurz používá Azure Databricks a poznámkový blok Jupyter k ilustraci integrace s rozhraním API pro NoSQL ze Sparku. Tento kurz se zaměřuje na Python a Scala, i když můžete použít libovolný jazyk nebo rozhraní podporované Sparkem.
V tomto kurzu se naučíte:
- Připojte se k účtu ROZHRANÍ API pro NoSQL pomocí Sparku a poznámkového bloku Jupyter.
- Vytvořte prostředky databáze a kontejneru.
- Příjem dat do kontejneru
- Dotazování dat v kontejneru
- Provádění běžných operací s položkami v kontejneru
Požadavky
- Existující účet Azure Cosmos DB for NoSQL.
- Pokud máte existující předplatné Azure, vytvořte nový účet.
- Žádné předplatné Azure? Službu Azure Cosmos DB můžete vyzkoušet zdarma bez nutnosti platební karty.
- Existující pracovní prostor Azure Databricks.
Připojení pomocí Sparku a Jupyteru
Pomocí stávajícího pracovního prostoru Azure Databricks vytvořte výpočetní cluster připravený k použití Apache Sparku 3.4.x pro připojení k vašemu účtu Azure Cosmos DB for NoSQL.
Otevřete pracovní prostor Azure Databricks.
V rozhraní pracovního prostoru vytvořte nový cluster. Nakonfigurujte cluster s těmito nastaveními minimálně:
Verze Hodnota Verze modulu runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Pomocí rozhraní pracovního prostoru vyhledejte balíčky Maven z Maven Central s ID
com.azure.cosmos.spark
skupiny . Nainstalujte balíček speciálně pro Spark 3.4 s ID artefaktu předponouazure-cosmos-spark_3-4
clusteru.Nakonec vytvořte nový poznámkový blok.
Tip
Ve výchozím nastavení je poznámkový blok připojený k nedávno vytvořenému clusteru.
V poznámkovém bloku nastavte nastavení konfigurace online zpracování transakcí (OLTP) pro koncový bod účtu NoSQL, název databáze a název kontejneru.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Vytvoření databáze a kontejneru
Rozhraní API katalogu slouží ke správě prostředků účtu, jako jsou databáze a kontejnery. Pak můžete použít OLTP ke správě dat v rámci prostředků kontejneru.
Nakonfigurujte rozhraní API katalogu pro správu prostředků API pro NoSQL pomocí Sparku.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Vytvořte novou databázi s názvem
cosmicworks
pomocí příkazuCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Vytvořte nový kontejner pojmenovaný
products
pomocí .CREATE TABLE IF NOT EXISTS
Ujistěte se, že jste nastavili cestu/category
k klíči oddílu a povolili propustnost automatického1000
škálování s maximální propustností jednotek žádostí (RU) za sekundu.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Vytvořte jiný kontejner s názvem
employees
pomocí hierarchie konfigurace klíče oddílu. Použijte/organization
,/department
a/team
jako sadu cest klíče oddílu. Postupujte podle tohoto konkrétního pořadí. Také nastavte propustnost na ruční množství400
RU.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Spuštěním buněk poznámkového bloku ověřte, že se vaše databáze a kontejnery vytvářejí v rámci účtu rozhraní API pro NoSQL.
Ingestace dat
Vytvořte ukázkovou datovou sadu. Pak pomocí OLTP ingestujte tato data do kontejneru API for NoSQL.
Vytvořte ukázkovou datovou sadu.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
K přidání ukázkových dat do cílového kontejneru použijte
spark.createDataFrame
dříve uloženou konfiguraci OLTP.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Zadávání dotazů na data
Načtěte data OLTP do datového rámce, abyste mohli provádět běžné dotazy na data. K filtrování nebo dotazování dat můžete použít různé syntaxe.
Slouží
spark.read
k načtení dat OLTP do objektu datového rámce. Použijte stejnou konfiguraci, kterou jste použili dříve v tomto kurzu. Také nastavtespark.cosmos.read.inferSchema.enabled
, abytrue
konektor Spark mohl odvodit schéma vzorkováním existujících položek.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Vykreslení schématu dat načtených v datovém rámci pomocí
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Vykreslujte řádky dat, ve kterých
quantity
je sloupec menší než20
.where
K provedení tohoto dotazu použijte funkce ashow
funkce.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Vykreslí první řádek dat, ve
clearance
kterém jetrue
sloupec .filter
K provedení tohoto dotazu použijte funkci.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Vykreslí pět řádků dat bez filtru nebo zkrácení.
show
Pomocí funkce můžete přizpůsobit vzhled a počet vykreslených řádků.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Dotazování dat pomocí tohoto nezpracovaného řetězce dotazu NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Provádění běžných operací
Při práci s daty API for NoSQL ve Sparku můžete provádět částečné aktualizace nebo pracovat s daty jako nezpracovaným kódem JSON.
Provedení částečné aktualizace položky:
Zkopírujte existující
config
konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně nakonfigurujte strategii zápisu naItemPatch
. Pak hromadnou podporu zakažte. Nastavte sloupce a mapované operace. Nakonec nastavte výchozí typ operace naSet
hodnotu .# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Vytvořte proměnné pro klíč oddílu položky a jedinečný identifikátor, na který chcete cílit jako součást této operace opravy.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Vytvořte sadu objektů oprav, které určí cílovou položku a určí pole, která se mají upravit.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Vytvořte datový rámec pomocí sady objektů patch. Slouží
write
k provedení operace opravy.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Spuštěním dotazu zkontrolujte výsledky operace opravy. Položka by teď měla být pojmenovaná
Yamba New Surfboard
bez dalších změn.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Práce s nezpracovanými daty JSON:
Zkopírujte existující
config
konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně změňte cílový kontejner naemployees
. Potom nakonfigurujtecontacts
sloupec nebo pole tak, aby používal nezpracovaná data JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Vytvořte sadu zaměstnanců, kteří se mají ingestovat do kontejneru.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Vytvořte datový rámec a použijte
write
ingestování dat zaměstnanců.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Vykreslení dat z datového rámce pomocí
show
. Všimněte si, že ve výstupucontacts
je nezpracovaný JSON.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Související obsah
- Apache Spark
- Rozhraní API katalogu Služby Azure Cosmos DB
- Referenční informace o parametrech konfigurace
- Ukázky konektoru Spark pro Azure Cosmos DB
- Migrace ze Sparku 2.4 na Spark 3.*
- Kompatibilita verzí:
- Poznámky:
- Odkazy ke stažení: