Carregar dados com o Delta Live Tables

Você pode carregar dados de qualquer fonte de dados compatível com o Apache Spark no Azure Databricks usando o Delta Live Tables. Você pode definir conjuntos de dados (tabelas e exibições) no Delta Live Tables em relação a qualquer consulta que retorne um DataFrame do Spark, incluindo DataFrames de streaming e Pandas para DataFrames do Spark. Para tarefas de ingestão de dados, o Databricks recomenda usar tabelas de streaming para a maioria dos casos de uso. As tabelas de streaming são boas para assimilar dados do armazenamento de objetos da nuvem usando o Auto Loader ou de barramentos de mensagens como o Kafka. Os exemplos abaixo demonstram alguns padrões comuns.

Importante

Nem todas as fontes de dados têm suporte a SQL. Você pode misturar notebooks SQL e Python em um pipeline do Delta Live Tables para usar o SQL para todas as operações além da ingestão.

Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas em Tabelas Dinâmicas Delta por padrão, consulte Gerenciar dependências do Python para pipelines de Tabelas Dinâmicas Delta.

Carregar arquivos do armazenamento de objetos da nuvem

O Databricks recomenda o uso do Auto Loader para ingestão de dados incremental do armazenamento de objetos de nuvem. O Auto Loader e o Delta Live Tables são projetados para carregar dados cada vez maiores de forma incremental e idempotente à medida que chegam ao armazenamento em nuvem. Os exemplos a seguir usam o Carregador automático para criar conjuntos de dados dos arquivos CSV e JSON:

Observação

Para carregar arquivos com o Carregador Automático em um pipeline habilitado para o Catálogo do Unity, você deve usar localizações externas. Para saber mais sobre como usar o Catálogo do Unity com o Delta Live Tables, confira Usar o Catálogo do Unity com seus pipelines do Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Consulte O que é Auto Loader? e Sintaxe SQL do Auto Loader.

Aviso

Se você usar o Auto Loader com notificações de arquivo e executar uma atualização completa para seu pipeline ou tabela de streaming, deverá limpar manualmente os recursos. Você pode usar o CloudFilesResourceManager em um notebook para executar a limpeza.

Carregar dados de um barramento de mensagem

Você pode configurar pipelines do Delta Live Tables para ingerir dados de barramentos de mensagens com tabelas de streaming. O Databricks recomenda combinar tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência de barramentos de mensagens. Consulte Otimizar a utilização do cluster de pipelines do Delta Live Tables com dimensionamento automático aprimorado.

Por exemplo, o código a seguir configura uma tabela de streaming para ingerir dados do Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Você pode gravar operações downstream em SQL puro para executar transformações de streaming nesses dados, como no exemplo a seguir:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Para obter um exemplo de como trabalhar com Hubs de Eventos, consulte Usar Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables.

Confira Configurar fontes de dados de streaming.

Carregar dados de locais externos

O Delta Live Tables dá suporte ao carregamento de dados de qualquer fonte de dados com suporte do Azure Databricks. Confira Conectar-se a fontes de dados. O Databricks recomenda o carregamento de dados externos usando a Federação Lakehouse para fontes de dados com suporte. Como a Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou superior, para usar a Lakehouse Federation, o seu pipeline deve ser configurado para usar o canal de pré-visualização.

Algumas fontes de dados não têm suporte equivalente no SQL. Se você não puder usar o Lakehouse Federation com uma dessas fontes de dados, poderá usar um bloco de anotações Python para ingerir dados da origem. Você pode adicionar código-fonte Python e SQL ao mesmo pipeline do Delta Live Tables. O exemplo a seguir declara uma exibição materializada para acessar o estado atual dos dados em uma tabela remota do PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos de nuvem

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Delta Live Tables oferece suporte a todos os formatos de arquivo suportados pelo Apache Spark no Azure Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Observação

A construção SQL SELECT * FROM format.`path`; é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL com Delta Live Tables.

Credenciais de armazenamento de acesso com segurança com segredos em um pipeline

Você pode usar segredos do Azure Databricks para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade Spark na configuração de cluster das configurações de pipeline. Consulte Configurar a computação para um pipeline do Delta Live Tables.

O exemplo a seguir usa um segredo para armazenar uma chave de acesso necessária para leitura de dados de entrada de uma conta de armazenamento do ADLS Gen2 (Azure Data Lake Storage Gen2) usando o Carregador Automático. Você pode usar esse mesmo método para configurar um segredo exigido pelo pipeline, por exemplo, chaves AWS para acessar o S3 ou a senha para um metastore do Hive do Apache.

Para saber mais sobre como trabalhar com o Azure Data Lake Storage Gen2, confira Conectar ao Azure Data Lake Storage Gen2 e o Armazenamento de Blobs.

Observação

Você deve adicionar o prefixo spark.hadoop. à chave de configuração spark_conf que define o valor secreto.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Substitua

  • <storage-account-name> pelo nome da conta de armazenamento do ADLS Gen2.
  • <scope-name> pelo nome do escopo de segredo do Azure Databricks.
  • <secret-name> pelo nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Substitua

  • <container-name> com o nome do contêiner da conta de armazenamento do Azure que armazena os dados de entrada.
  • <storage-account-name> pelo nome da conta de armazenamento do ADLS Gen2.
  • <path-to-input-dataset> com o caminho para o conjunto de dados de entrada.

Carregar dados dos Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com o Apache Kafka. Você pode usar o conector Kafka de Streaming Estruturado, incluído no runtime do Delta Live Tables, para carregar mensagens dos Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, confira Usar os Hubs de Eventos do Azure como uma fonte de dados do Delta Live Tables.