Tutorial: Executar seu primeiro pipeline do Delta Live Tables

Este tutorial mostra como você deve configurar um pipeline de dados do Delta Live Tables pelo código em um bloco de anotações do Databricks e iniciar uma atualização. Este tutorial inclui um pipeline de exemplo para ingerir e processar um conjunto de dados de exemplo com o código de exemplo usando as interfaces Python e SQL. Você também pode usar as instruções neste tutorial para criar um pipeline com blocos de anotações com sintaxe do Delta Live Tables definidas corretamente.

Você pode configurar os pipelines do Delta Live Tables e iniciar atualizações usando a interface do usuário do workspace do Azure Databricks ou as opções de ferramentas automatizadas, como a API, CLI, Pacotes de Ativos do Databricks, ou como uma tarefa em um fluxo de trabalho do Databricks. Para se familiarizar com a funcionalidade e os recursos do Delta Live Tables, o Databricks recomenda primeiro usar a interface do usuário para criar e executar os pipelines. Além disso, quando você configura um pipeline na interface do usuário, o Delta Live Tables gera uma configuração JSON no seu pipeline que pode ser usada para implementar seus fluxos de trabalho programáticos.

Para demonstrar a funcionalidade do Delta Live Tables, os exemplos neste tutorial baixam um conjunto de dados disponível publicamente. Porém, o Databricks tem várias maneiras de se conectar às fontes de dados e ingerir os dados que serão usados pelos pipelines que implementam casos de uso do mundo real. Consulte Ingerir dados com o Delta Live Tables.

Requisitos

  • Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster definindo um cluster do Delta Live Tables. O runtime do Delta Live Tables cria um cluster antes de executar o pipeline e falhará se você não tiver a permissão correta.

  • Para usar os exemplos neste tutorial, seu espaço de trabalho deve ter o Catálogo do Unity habilitado.

  • Você deve ter as seguintes permissões no Catálogo do Unity:

    • READ VOLUME e WRITE VOLUME, ou ALL PRIVILEGES, para o my-volume volume.
    • USE SCHEMA ou ALL PRIVILEGES para o esquema default.
    • USE CATALOG ou ALL PRIVILEGES para o catálogo main.

    Para definir essas permissões, consulte o administrador do Databricks ou Privilégios e objetos protegidos do Catálogo do Unity.

  • Os exemplos neste tutorial usam um volume do Catálogo do Unity para armazenar dados de exemplo. Para usar esses exemplos, crie um volume e use esse catálogo de volumes, esquema e nomes de volume para definir o caminho de volume usado pelos exemplos.

Observação

Se seu espaço de trabalho não tiver o Catálogo do Unity habilitado, os notebooks com exemplos que não exigem o Catálogo do Unity serão anexados a este artigo. Para usar esses exemplos, selecione Hive metastore como a opção de armazenamento ao criar o pipeline.

Onde você executa as consultas do Delta Live Tables?

As consultas do Delta Live Tables são implementadas principalmente nos blocos de anotações do Databricks, mas o Delta Live Tables não foi projetado para ser executado interativamente nas células do notebook. A execução de uma célula que contém a sintaxe Delta Live Tables em um notebook do Databricks resulta em uma mensagem de erro. Para executar suas consultas, você deve configurar seus notebooks como parte de um pipeline.

Importante

  • Você não pode contar com a ordem de execução célula a célula dos notebooks ao gravar consultas no Delta Live Tables. O Delta Live Tables avalia e executa todos os códigos definidos nos notebooks, mas tem um modelo de execução diferente do comando Executar tudo de um notebook.
  • Você não pode misturar linguagens em um arquivo de código-fonte do Delta Live Tables. Por exemplo, um notebook pode conter apenas consultas Python ou consultas SQL. Se você precisar usar vários idiomas em um pipeline, use vários notebooks ou arquivos específicos da linguagem no pipeline.

Você também pode usar o código Python armazenado nos arquivos. Por exemplo, você pode criar um módulo Python que pode ser importado para seus pipelines do Python ou definir as funções definidas pelo usuário (UDFs) do Python para usar nas consultas SQL. Para saber mais sobre como importar os módulos do Python, consulte Importar os módulos do Python das pastas GIT ou dos arquivos do espaço de trabalho. Para saber mais sobre como usar as UDFs do Python, consulte Funções escalares definidas pelo usuário – Python.

Exemplo: ingerir e processar os dados de nomes de bebê de Nova York

O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Estes exemplos demonstram o uso de um pipeline do Delta Live Tables para:

  • Leia os dados CSV brutos de um conjunto de dados disponível publicamente em uma tabela.
  • Ler os registros da tabela de dados brutos e usar as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
  • Use os registros limpos como entrada para as consultas do Delta Live Tables que criam conjuntos de dados derivados.

Esse código demonstra um exemplo simplificado da arquitetura medallion. Confira O que é a arquitetura medallion do Lakehouse?.

As implementações deste exemplo são fornecidas para as interfaces do Python e SQL. Você pode seguir as etapas para criar novos notebookss que contenham o código de exemplo ou pode pular para Criar um pipeline e usar um dos notebooks fornecidos nesta página.

implementar um pipeline de Tabelas Dinâmicas Delta com Python

O código Python que cria conjuntos de dados delta live tables deve retornar DataFrames. Para usuários que não estão familiarizados com Python e DataFrames, o Databricks recomenda usar a interface SQL. Consulte Implementar um pipeline do Delta Live Tables com SQL.

Todas as APIs do Python do Delta Live Tables são implementadas no módulo dlt. O código de pipeline do Delta Live Tables implementado com o Python deve importar explicitamente o módulo dlt na parte superior dos notebooks e arquivos do Python. O Delta Live Tables difere de muitos scripts Python de uma maneira fundamental: você não chama as funções que executam a ingestão e a transformação de dados para criar conjuntos de dados do Delta Live Tables. Em vez disso, o Delta Live Tables interpreta as funções decoradoras do módulo dlt em todos os arquivos carregados em um pipeline e cria um grafo de fluxo de dados.

Para implementar o exemplo neste tutorial, copie e cole o seguinte código Python em um novo notebook Python. Adicione cada snippet de código de exemplo à sua própria célula no notebook na ordem descrita. Para ler as opções para a criação de notebooks, confira Criar um notebook.

Quando você cria um pipeline com a interface de Python, por padrão, os nomes de tabela são definidos por nomes de função. Por exemplo, o exemplo de Python a seguir cria três tabelas chamadas baby_names_raw, baby_names_prepared e top_baby_names_2021. Você pode substituir o nome da tabela usando o parâmetro name. Confira Criar uma exibição materializada ou tabela de streaming do Delta Live Tables.

Importante

Para evitar um comportamento inesperado quando o pipeline é executado, não inclua código que possa ter efeitos colaterais em suas funções que definem conjuntos de dados. Para saber mais, confira a referência do Python.

Importar o módulo Delta Live Tables

Todas as APIs do Python do Delta Live Tables são implementadas no módulo dlt. Importe explicitamente o módulo dlt na parte superior dos notebooks e arquivos do Python.

O exemplo a seguir mostra essa importação, juntamente com instruções de importação para pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Baixar os dados

Para obter os dados deste exemplo, baixe um arquivo CSV e armazene-os no volume da seguinte maneira:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes do catálogo, esquema e volume de um volume do Catálogo do Unity.

Criar uma tabela partir de arquivos no armazenamento de objetos

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.

O decorador @dlt.table informa ao Delta Live Tables para criar uma tabela que contenha o resultado de uma DataFrame retornada por uma função. Adicione o decorador @dlt.table antes de qualquer definição de função do Python que retorne um DataFrame do Spark para registrar uma nova tabela no Delta Live Tables. O exemplo a seguir demonstra o uso do nome da função como o nome da tabela e a inclusão de um comentário descritivo à tabela:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Adicionar uma tabela de um conjunto de dados upstream no pipeline

Você pode usar dlt.read() para ler dados de outros conjuntos de dados declarados no seu pipeline atual do Delta Live Tables. Declarar novas tabelas dessa forma cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar atualizações. O código a seguir também inclui exemplos de monitoramento e implementação da qualidade de dados com expectativas. Confira Gerenciar a qualidade dos dados com o Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Criar uma tabela com exibições de dados enriquecidas

Como o Delta Live Tables processa atualizações para os pipelines como uma série de grafos de dependência, você pode declarar exibições altamente enriquecidas que alimentam painéis de controle, business intelligence e análises de dados ao declarar tabelas com uma lógica de negócios específica.

As tabelas do Delta Live Tables são conceitualmente equivalentes às exibições materializadas. Ao contrário das exibições tradicionais no Spark que executam a lógica sempre que a exibição é consultada, uma tabela Delta Live Tables armazena a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia atualizações para todos os conjuntos de dados em um pipeline, você pode agendar atualizações de pipeline para corresponder aos requisitos de latência para exibições materializadas e saber que as consultas nessas tabelas contêm a versão mais recente dos dados disponíveis.

A tabela definida pelo código a seguir demonstra a semelhança conceitual com uma exibição materializada derivada de dados upstream em seu pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Para configurar um pipeline que usa o notebook, consulte Criar um pipeline.

Implementar um pipeline do Delta Live Tables com o SQL .

O Databricks recomenda o Delta Live Tables com SQL como a maneira preferencial para os usuários do SQL criarem novos pipelines de ETL, ingestão e transformação no Azure Databricks. A interface do SQL no Delta Live Tables estende o SQL do Spark padrão com muitas novas palavras-chave, constructos e funções com valor de tabela. Esses acréscimos ao SQL padrão permitem que os usuários declarem dependências entre os conjuntos de dados e implantem a infraestrutura no nível de produção sem a necessidade de aprenderem como usar novas ferramentas ou conceitos adicionais.

Para os usuários familiarizados com o Spark DataFrames e que precisam de suporte para testes mais extensos e operações que são difíceis de implementar com o SQL, como operações de metaprogramação, o Databricks recomenda usr a interface Python. Consulte Implementar um pipeline de Tabelas Dinâmicas Delta com o Python.

Baixar os dados

Para obter os dados deste exemplo, copie o código a seguir, cole-os em um novo notebook e execute o notebook. Para ler as opções para a criação de notebooks, confira Criar um notebook.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes do catálogo, esquema e volume de um volume do Catálogo do Unity.

Criar uma tabela com base nos arquivos no Catálogo do Unity

No restante deste exemplo, copie os snippets de SQL a seguir e cole-os em um novo notebook SQL, separado do notebook da seção anterior. Adicione cada exemplo de snippet de SQL à sua própria célula no notebook na ordem descrita.

O Delta Live Tables dá suporte ao carregamento de dados de todos os formatos compatíveis com o Azure Databricks. Confira Opções de formato de arquivo.

Todas as instruções SQL do Delta Live Tables usam a sintaxe e a semântica CREATE OR REFRESH. Quando você atualiza um pipeline, o Delta Live Tables determina se o resultado logicamente correto para a tabela pode ser obtido por meio de um processamento adicional ou se é preciso recalcular totalmente.

O exemplo a seguir cria uma tabela carregando os dados do arquivo CSV armazenado no volume do Catálogo do Unity:

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes do catálogo, esquema e volume de um volume do Catálogo do Unity.

Adicionar ao pipeline uma tabela de um conjunto de dados upstream

Você pode usar o esquema virtual live para consultar dados de outros conjuntos de dados declarados no seu pipeline atual do Delta Live Tables. Declarar novas tabelas dessa forma cria uma dependência que o Delta Live Tables resolve automaticamente antes de executar atualizações. O esquema live é uma palavra-chave personalizada implementada no Delta Live Tables que pode ser substituída por um esquema de destino se você quiser publicar seus conjuntos de dados. Consulte Usar o Catálogo do Unity com seus pipelines do Delta Live Tables e Publicar os dados de pipelines do Delta Live Tables no metastore do Hive.

O código a seguir também inclui exemplos de monitoramento e implementação da qualidade de dados com expectativas. Confira Gerenciar a qualidade dos dados com o Delta Live Tables.

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Criar uma exibição de dados enriquecidos

Devido ao fato de o Delta Live Tables processar atualizações para os pipelines como uma série de grafos de dependência, você pode declarar exibições altamente enriquecidas que alimentam painéis de controle, business intelligence e análises de dados ao declarar tabelas com uma lógica de negócios específica.

A consulta a seguir usa uma exibição materializada para criar uma exibição enriquecida com base nos dados upstream. Ao contrário das exibições tradicionais no Spark que executam a lógica sempre que a exibição é consultada, as exibições materializadas armazenam a versão mais recente dos resultados da consulta em arquivos de dados. Como o Delta Live Tables gerencia atualizações para todos os conjuntos de dados em um pipeline, você pode agendar as atualizações de pipelines para corresponderem aos requisitos de latência para exibições materializadas e ter a certeza de que as consultas a essas tabelas contêm a versão mais recente dos dados disponíveis.

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Para configurar um pipeline que usa o notebook, continue a Criar um pipeline.

Criar um pipeline

Observação

  • Como os recursos de computação são totalmente gerenciados para pipelines DLT sem servidor, as configurações de computação não estão disponíveis quando você seleciona Sem servidor para um pipeline.
  • Para obter informações sobre qualificação e habilitação para pipelines DLT sem servidor, consulte Habilitar computação sem servidor.

O Delta Live Tables cria pipelines resolvendo dependências definidas em notebooks ou arquivos (chamados de código-fonte ou bibliotecas) usando a sintaxe do Delta Live Tables. Cada arquivo de código-fonte pode conter apenas uma linguagem, mas você pode misturar bibliotecas de diferentes linguagens em seu pipeline.

  1. Clique em Delta Live Tables na barra lateral e clique em Criar Pipeline.
  2. Dê um nome ao pipeline.
  3. (Opcional) Para executar o pipeline usando pipelines DLT sem servidor, use a caixa de seleção Sem servidor. Quando você seleciona Sem servidor, as configurações da Computação são removidas da interface do usuário. Confira Criar pipelines totalmente gerenciados usando Delta Live Tables com a computação sem servidor.
  4. (Opcional) Selecione uma edição do produto.
  5. Selecione Disparado por para o Modo de Pipeline.
  6. Configure um ou mais notebooks que contenham o código-fonte do pipeline. Na caixa de texto Caminhos, insira o caminho para um notebook ou clique em Ícone do Seletor de Arquivos para selecionar um notebook.
  7. Selecione um destino para os conjuntos de dados publicados pelo pipeline, seja o metastore do Hive ou o Catálogo do Unity. Confira Publicar conjuntos de dados.
    • Metastore do Hive:
      • (Opcional) Insira um local de armazenamento para dados de saída do pipeline. O sistema usará um local padrão se você deixar o Armazenamento local vazio.
      • (Opcional) Especifique um Esquema de destino para publicar seu conjunto de dados no metastore do Hive.
    • Catálogo do Unity: especifique um Catálogo e um Esquema de destino para publicar seu conjunto de dados no Catálogo do Unity.
  8. (Opcional) Se você não tiver selecionado Sem servidor, poderá definir as configurações de computação para o pipeline. Para saber mais sobre as opções de configurações de computação, confira Definir as configurações de pipeline para o Delta Live Tables.
  9. (Opcional) Clique em Adicionar notificação para configurar um ou mais endereços de email para receber notificações para eventos de pipeline. Confira Adicionar notificações por email para eventos de pipeline.
  10. (Opcional) Defina as configurações avançadas do pipeline. Para saber mais sobre as opções de configurações avançadas, confira Definir as configurações do pipeline para o Delta Live Tables.
  11. Clique em Criar.

A página Detalhes do Pipeline é exibida depois que você clica em Criar. Você também pode acessar seu pipeline clicando no nome do pipeline na guia Delta Live Tables.

Iniciar uma atualização de pipeline

Para iniciar uma atualização para um pipeline, clique no botão Ícone Iniciar Delta Live Tables no painel superior. O sistema retorna uma mensagem confirmando que o pipeline está iniciando.

Depois de iniciar com êxito a atualização, o sistema do Delta Live Tables:

  1. Inicia um cluster usando uma configuração de cluster criada pelo sistema do Delta Live Tables. Também é possível especificar uma configuração de cluster personalizada.
  2. Cria todas as tabelas que não existem e garante que o esquema está correto para todas as tabelas existentes.
  3. Atualiza as tabelas com os dados mais recentes disponíveis.
  4. Desliga o cluster quando a atualização é concluída.

Observação

O modo de execução é definido como Produção por padrão, que implanta recursos de computação efêmera para cada atualização. Você pode usar o modo de desenvolvimento para alterar esse comportamento, permitindo que os mesmos recursos de computação sejam usados para várias atualizações de pipeline durante o desenvolvimento e o teste. Confira os Modos de desenvolvimento e produção.

Publicar conjuntos de dados

Você pode disponibilizar conjuntos de dados delta live tables para consulta publicando tabelas no metastore do Hive ou no Catálogo do Unity. Se você não especificar um destino para a publicação de dados, as tabelas criadas nos pipelines do Delta Live Tables só poderão ser acessadas por outras operações nesse mesmo pipeline. Confira Publicar dados do Delta Live Tables no metastore do Hive e Usar o Catálogo do Unity com seus pipelines do Delta Live Tables.

Notebooks de código-fonte de exemplo

Você pode importar esses notebooks para o workspace do Azure Databricks e usá-los para implantar um pipeline do Delta Live Tables. Confira Criar um pipeline.

Introdução ao notebook Python do Delta Live Tables

Obter notebook

Introdução ao notebook SQL do Delta Live Tables

Obter notebook

Exemplo de notebooks de código-fonte para espaços de trabalho sem o Catálogo do Unity

Você pode importar esses notebooks para um workspace do Azure Databricks sem o Catálogo do Unity habilitado e usá-los para implantar um pipeline do Delta Live Tables. Confira Criar um pipeline.

Introdução ao notebook Python do Delta Live Tables

Obter notebook

Introdução ao notebook SQL do Delta Live Tables

Obter notebook