Kopieren von Daten aus Azure Cosmos DB in einen dedizierten SQL-Pool mithilfe von Apache Spark
Azure Synapse Link für Azure Cosmos DB ermöglicht Benutzern das Ausführen von Analysen in Quasi-Echtzeit über operative Daten in Azure Cosmos DB. Manchmal müssen einige Daten für die Nutzung durch Data Warehouse-Benutzer jedoch aggregiert und angereichert werden. Azure Synapse Link-Daten können mit einigen wenigen Zellen in einem Notebook zusammengestellt und exportiert werden.
Voraussetzungen
- Bereitstellen eines Synapse-Arbeitsbereichs mit:
- Bereitstellen eines Azure Cosmos DB-Kontos mit einem HTAP-Container mit Daten
- Herstellen einer Verbindung zwischen dem Azure Cosmos DB-HTAP-Container mit dem Arbeitsbereich
- Einrichten des richtigen Setups zum Importieren von Daten in einen dedizierten SQL-Pool aus Spark
Schritte
In diesem Tutorial stellen Sie eine Verbindung mit dem Analysespeicher her, sodass der Transaktionsspeicher nicht beeinträchtigt wird (es werden keine Anforderungseinheiten beansprucht). Wir durchlaufen die folgenden Schritte:
- Einlesen des Azure Cosmos DB-HTAP-Containers in einen Spark-Datenrahmen
- Aggregieren der Ergebnisse in einem neuen Datenrahmen
- Erfassen der Daten in einem dedizierten SQL-Pool
Daten
In diesem Beispiel verwenden wir einen HTAP-Container namens RetailSales. Er ist Teil eines verknüpften Diensts namens ConnectedData und hat das folgende Schema:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
Die Verkäufe (quantity, revenue (Preis × Menge) werden zur Berichterstellung nach productCode und weekStarting aggregiert. Schließlich werden diese Daten in eine Tabelle im dedizierten SQL-Pool mit dem Namen dbo.productsales
exportiert.
Konfigurieren eines Spark-Notebooks
Erstellen Sie ein Spark-Notebook mit Scala as Spark (Scala) als Hauptsprache. Für die Sitzung wird die Standardeinstellung des Notebooks verwendet.
Lesen der Daten in Spark
Lesen Sie den Azure Cosmos DB-HTAP-Container mit Spark in einem Datenrahmen in der ersten Zelle.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Aggregieren der Ergebnisse in einem neuen Datenrahmen
In der zweiten Zelle führen Sie die Transformation und Aggregate aus, die für den neuen Datenrahmen benötigt werden, bevor er in eine Datenbank im dedizierten SQL-Pool geladen wird.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Laden der Ergebnisse in einen dedizierten SQL-Pool
In der dritten Zelle werden die Daten in einen dedizierten SQL-Pool geladen. Es werden automatisch eine temporäre externe Tabelle, eine externe Datenquelle und ein externes Dateiformat erstellt, die nach Abschluss des Auftrags gelöscht werden.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Abfragen der Ergebnisse mit SQL
Sie können das Ergebnis mithilfe einer einfachen SQL-Abfrage abfragen, etwa mit dem folgenden SQL-Skript:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Ihre Abfrage zeigt die folgenden Ergebnisse in einem Diagrammmodus an: