Kopieren von Daten aus Azure Cosmos DB in einen dedizierten SQL-Pool mithilfe von Apache Spark

Azure Synapse Link für Azure Cosmos DB ermöglicht Benutzern das Ausführen von Analysen in Quasi-Echtzeit über operative Daten in Azure Cosmos DB. Manchmal müssen einige Daten für die Nutzung durch Data Warehouse-Benutzer jedoch aggregiert und angereichert werden. Azure Synapse Link-Daten können mit einigen wenigen Zellen in einem Notebook zusammengestellt und exportiert werden.

Voraussetzungen

Schritte

In diesem Tutorial stellen Sie eine Verbindung mit dem Analysespeicher her, sodass der Transaktionsspeicher nicht beeinträchtigt wird (es werden keine Anforderungseinheiten beansprucht). Wir durchlaufen die folgenden Schritte:

  1. Einlesen des Azure Cosmos DB-HTAP-Containers in einen Spark-Datenrahmen
  2. Aggregieren der Ergebnisse in einem neuen Datenrahmen
  3. Erfassen der Daten in einem dedizierten SQL-Pool

Schritte für Spark zu SQL 1

Daten

In diesem Beispiel verwenden wir einen HTAP-Container namens RetailSales. Er ist Teil eines verknüpften Diensts namens ConnectedData und hat das folgende Schema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Die Verkäufe (quantity, revenue (Preis × Menge) werden zur Berichterstellung nach productCode und weekStarting aggregiert. Schließlich werden diese Daten in eine Tabelle im dedizierten SQL-Pool mit dem Namen dbo.productsales exportiert.

Konfigurieren eines Spark-Notebooks

Erstellen Sie ein Spark-Notebook mit Scala as Spark (Scala) als Hauptsprache. Für die Sitzung wird die Standardeinstellung des Notebooks verwendet.

Lesen der Daten in Spark

Lesen Sie den Azure Cosmos DB-HTAP-Container mit Spark in einem Datenrahmen in der ersten Zelle.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Aggregieren der Ergebnisse in einem neuen Datenrahmen

In der zweiten Zelle führen Sie die Transformation und Aggregate aus, die für den neuen Datenrahmen benötigt werden, bevor er in eine Datenbank im dedizierten SQL-Pool geladen wird.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Laden der Ergebnisse in einen dedizierten SQL-Pool

In der dritten Zelle werden die Daten in einen dedizierten SQL-Pool geladen. Es werden automatisch eine temporäre externe Tabelle, eine externe Datenquelle und ein externes Dateiformat erstellt, die nach Abschluss des Auftrags gelöscht werden.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Abfragen der Ergebnisse mit SQL

Sie können das Ergebnis mithilfe einer einfachen SQL-Abfrage abfragen, etwa mit dem folgenden SQL-Skript:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Ihre Abfrage zeigt die folgenden Ergebnisse in einem Diagrammmodus an: Schritte für Spark zu SQL 2

Nächste Schritte