Copiar dados do Azure Cosmos DB para um pool SQL dedicado usando o Apache Spark

O Azure Synapse Link for Azure Cosmos DB permite que os usuários executem análises quase em tempo real sobre dados operacionais no Azure Cosmos DB. No entanto, há momentos em que alguns dados precisam ser agregados e enriquecidos para atender aos usuários do data warehouse. A curadoria e a exportação de dados do Azure Synapse Link podem ser feitas com apenas algumas células em um bloco de anotações.

Pré-requisitos

Passos

Neste tutorial, você se conectará ao repositório analítico para que não haja impacto no repositório transacional (ele não consumirá nenhuma unidade de solicitação). Passaremos pelas seguintes etapas:

  1. Leia o contêiner HTAP do Azure Cosmos DB em um dataframe do Spark
  2. Agregar os resultados em um novo dataframe
  3. Ingerir os dados em um pool SQL dedicado

Faísca para SQL Etapas 1

Dados

Nesse exemplo, usamos um contêiner HTAP chamado RetailSales. Ele faz parte de um serviço vinculado chamado ConnectedData e tem o seguinte esquema:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantidade: longo (anulável = verdadeiro)
  • preço: longo (anulável = verdadeiro)
  • id: string (nullable = true)
  • publicidade: longo (anulável = verdadeiro)
  • storeId: long (anulável = true)
  • semanaInício: longo (anulável = verdadeiro)
  • _etag: string (nullable = true)

Vamos agregar as vendas (quantidade, receita (preço x quantidade) por productCode e weekStarting para fins de relatório. Por fim, exportaremos esses dados para uma tabela de pool SQL dedicada chamada dbo.productsales.

Configurar um Bloco de Anotações do Spark

Crie um bloco de anotações do Spark com o Scala como Spark (Scala) como idioma principal. Usamos a configuração padrão do bloco de anotações para a sessão.

Leia os dados no Spark

Leia o contêiner HTAP do Azure Cosmos DB com o Spark em um quadro de dados na primeira célula.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Agregar os resultados em um novo dataframe

Na segunda célula, executamos a transformação e as agregações necessárias para o novo dataframe antes de carregá-lo em um banco de dados de pool SQL dedicado.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Carregue os resultados em um pool SQL dedicado

Na terceira célula, carregamos os dados em um pool SQL dedicado. Ele criará automaticamente uma tabela externa temporária, fonte de dados externa e formato de arquivo externo que serão excluídos assim que o trabalho for concluído.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Consultar os resultados com SQL

Você pode consultar o resultado usando uma consulta SQL simples, como o seguinte script SQL:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Sua consulta apresentará os seguintes resultados em um modo de gráfico: Etapas 2 do Spark to SQL

Próximos passos