Consultar dados do Cosmos DB com o Spark

Concluído

Depois de adicionar um serviço vinculado para o seu banco de dados do Azure Cosmos DB habilitado para repositório analítico, você poderá usá-lo para consultar os dados usando um pool do Spark em seu workspace do Azure Synapse Analytics.

Carregando dados analíticos do Azure Cosmos DB em um dataframe

Para exploração inicial ou análise rápida de dados de um serviço vinculado do Azure Cosmos DB, geralmente é mais fácil carregar dados de um contêiner em um dataframe usando uma linguagem com suporte do Spark, como PySpark (uma implementação do Python específica do Spark) ou Scala (uma linguagem baseada em Java frequentemente usada no Spark).

Por exemplo, o seguinte código PySpark pode ser usado para carregar um dataframe chamado df dos dados no contêiner my-container conectado para usar o serviço vinculado my_linked_service e exibir as dez primeiras linhas de dados:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Vamos supor que o contêiner my-container seja usado para armazenar itens semelhantes ao seguinte exemplo:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

A saída do código PySpark seria semelhante à seguinte tabela:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Os dados são carregados do repositório analítico no contêiner, não do repositório operacional; isso garante que não haja nenhuma sobrecarga de consulta no repositório operacional. Os campos no armazenamento de dados analítico incluem os campos definidos pelo aplicativo (nesse caso productID e productName) e campos de metadados criados automaticamente.

Depois de carregar o dataframe, você pode usar os métodos nativos dele para explorar os dados. Por exemplo, o seguinte código cria um dataframe contendo apenas as colunas productID e productName, ordenadas por productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

A saída desse código seria semelhante a esta tabela:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Gravar um dataframe em um contêiner do Cosmos DB

Na maioria dos cenários de HTAP, você deve usar o serviço vinculado para ler dados no Spark do repositório analítico. No entanto, você pode gravar o conteúdo de um dataframe no contêiner, conforme mostrado no seguinte exemplo:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Observação

Gravar um dataframe em um contêiner atualiza o repositório operacional e pode afetar o desempenho dele. As alterações são sincronizadas com o repositório analítico.

Usar o Spark SQL para consultar dados analíticos do Azure Cosmos DB

O Spark SQL é uma API do Spark que fornece sintaxe de linguagem SQL e semântica de banco de dados relacional em um pool do Spark. Você pode usar o Spark SQL para definir metadados para tabelas que podem ser consultadas usando SQL.

Por exemplo, o seguinte código cria uma tabela chamada Products com base no contêiner hipotético usado nos exemplos anteriores:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Dica

A palavra-chave %%sql no início do código é um magic que instrui o pool do Spark a executar o código como SQL em vez da linguagem padrão (que geralmente é definida como PySpark).

Usando essa abordagem, você pode criar um banco de dados lógico no pool do Spark que pode ser usado para consultar os dados analíticos no Azure Cosmos DB para dar suporte a cargas de trabalho de análise de dados e relatórios sem afetar o repositório operacional em sua conta do Azure Cosmos DB.