Tutoriel : exécuter votre premier pipeline Delta Live Tables

Ce tutoriel vous guide tout au long des étapes permettant de configurer votre premier pipeline Delta Live Tables, d’écrire du code ETL de base et d’exécuter une mise à jour de pipeline.

Toutes les étapes de ce didacticiel sont conçues pour les espaces de travail avec Unity Catalog activé. Vous pouvez également configurer des pipelines Delta Live Tables pour qu’ils fonctionnent avec le metastore Hive hérité. Consultez Utiliser des pipelines Delta Live Tables avec un metastore Hive hérité.

Remarque

Ce didacticiel contient des instructions pour développer et valider du nouveau code de pipeline à l’aide de notebooks Databricks. Vous pouvez également configurer des pipelines à l’aide du code source dans des fichiers Python ou SQL.

Vous pouvez configurer un pipeline pour exécuter votre code si vous avez déjà écrit du code source à l’aide de la syntaxe Delta Live Tables. Consultez Configurer un pipeline Delta Live Tables.

Vous pouvez utiliser une syntaxe SQL entièrement déclarative dans Databricks SQL pour inscrire et définir des planifications d’actualisation pour les vues matérialisées et les tables de diffusion en continu en tant qu’objets gérés par le catalogue Unity. Consultez Utiliser des vues matérialisées dans Databricks SQL et charger des données à l’aide de tables de diffusion en continu dans Databricks SQL.

Exemple : ingérer et traiter les données sur les prénoms des bébés à New York

L’exemple de cet article utilise un jeu de données disponible publiquement qui contient des enregistrements de prénoms de bébés dans l’État de New York. Cet exemple illustre l’utilisation d’un pipeline Delta Live Tables pour :

  • Lit les données CSV brutes d’un volume dans une table.
  • Lisez les enregistrements de la table d’ingestion et utilisez les attentes delta Live Tables pour créer une table qui contient des données nettoyées.
  • Utiliser les enregistrements nettoyés comme entrées pour les requêtes Delta Live Tables qui créent des jeux de données dérivés.

Ce code illustre un exemple simplifié de l’architecture de médaillon. Consultez Qu’est-ce que l’architecture de médaillon dans un lakehouse ?.

Les implémentations de cet exemple sont fournies pour Python et SQL. Suivez les étapes pour créer un pipeline et un bloc-notes, puis copiez-collez le code fourni.

Des exemples de notebooks avec du code complet sont également fournis.

Spécifications

  • Pour démarrer un pipeline, vous devez disposer de l’autorisation de création de cluster ou d’un accès à une stratégie de cluster définissant un cluster Delta Live Tables. Le runtime Delta Live Tables crée un cluster avant d’exécuter votre pipeline et échoue si vous n’avez pas l’autorisation appropriée.

  • Tous les utilisateurs peuvent déclencher des mises à jour à l’aide de pipelines serverless par défaut. Serverless doit être activé au niveau du compte et peut ne pas être disponible dans votre région d’espace de travail. Voir Activer le calcul serverless.

  • Les exemples de ce didacticiel utilisent Le catalogue Unity. Databricks recommande de créer un schéma pour exécuter ce didacticiel, car plusieurs objets de base de données sont créés dans le schéma cible.

    • Pour créer un schéma dans un catalogue, vous devez disposer ALL PRIVILEGES ou USE CATALOG CREATE SCHEMA des privilèges.
    • Si vous ne pouvez pas créer de schéma, exécutez ce didacticiel sur un schéma existant. Vous devez disposer des privilèges suivants :
      • USE CATALOG pour le catalogue parent.
      • ALL PRIVILEGES ou USE SCHEMA, CREATE MATERIALIZED VIEWet CREATE TABLE privilèges sur le schéma cible.
    • Ce tutoriel utilise un volume pour stocker des exemples de données. Databricks recommande de créer un volume pour ce didacticiel. Si vous créez un schéma pour ce didacticiel, vous pouvez créer un volume dans ce schéma.
      • Pour créer un volume dans un schéma existant, vous devez disposer des privilèges suivants :
        • USE CATALOG pour le catalogue parent.
        • ALL PRIVILEGES ou et USE SCHEMA CREATE VOLUME des privilèges sur le schéma cible.
      • Vous pouvez éventuellement utiliser un volume existant. Vous devez disposer des privilèges suivants :
        • USE CATALOG pour le catalogue parent.
        • USE SCHEMA pour le schéma parent.
        • ALL PRIVILEGESWRITE VOLUME ou READ VOLUME sur le volume cible.

    Pour définir ces autorisations, contactez votre administrateur Databricks. Pour plus d’informations sur les privilèges du catalogue Unity, consultez privilèges de catalogue Unity et objets sécurisables.

Étape 0 : Télécharger des données

Cet exemple charge des données à partir d’un volume de catalogue Unity. Le code suivant télécharge un fichier CSV et le stocke dans le volume spécifié. Ouvrez un nouveau notebook et exécutez le code suivant pour télécharger ces données sur le volume spécifié :

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog. Le code fourni tente de créer le schéma et le volume spécifiés si ces objets n’existent pas. Vous devez disposer des privilèges appropriés pour créer et écrire dans des objets dans le catalogue Unity. Consultez Spécifications.

Remarque

Vérifiez que ce notebook s’exécute correctement avant de poursuivre le didacticiel. Ne configurez pas ce notebook dans le cadre de votre pipeline.

Étape 1 : Créer un pipeline

Delta Live Tables crée des pipelines en résolvant les dépendances définies dans les notebooks ou les fichiers (appelés code source) à l’aide de la syntaxe Delta Live Tables. Chaque fichier de code source ne peut contenir qu’une seule langue, mais vous pouvez ajouter plusieurs notebooks ou fichiers spécifiques à la langue dans le pipeline.

Important

Ne configurez aucune ressource dans le champ de code source . Le fait de laisser ce champ noir crée et configure un bloc-notes pour la création de code source.

Les instructions de ce didacticiel utilisent le calcul serverless et le catalogue Unity. Utilisez les paramètres par défaut pour toutes les options de configuration non mentionnées dans ces instructions.

Remarque

Si serverless n’est pas activé ou pris en charge dans votre espace de travail, vous pouvez suivre le didacticiel comme écrit à l’aide des paramètres de calcul par défaut. Vous devez sélectionner manuellement Le catalogue Unity sous options de stockage dans la section Destination de l’interface utilisateur de création d’un pipeline .

Pour configurer un nouveau pipeline, procédez comme suit :

  1. Cliquez sur Delta Live Tables dans la barre latérale.
  2. Cliquez sur Créer un pipeline.
  3. Fournissez un nom de pipeline unique.
  4. Cochez la case en regard de Serverless.
  5. Sélectionnez un catalogue pour publier des données.
  6. Sélectionnez un schéma dans le catalogue.
    • Spécifiez un nouveau nom de schéma pour créer un schéma.
  7. Définissez trois paramètres de pipeline à l’aide du bouton Ajouter une configuration sous Avancé pour ajouter trois configurations. Spécifiez le catalogue, le schéma et le volume vers lesquels vous avez téléchargé des données à l’aide des noms de paramètres suivants :
    • my_catalog
    • my_schema
    • my_volume
  8. Cliquez sur Créer.

L’interface utilisateur des pipelines s’affiche pour le pipeline nouvellement créé. Un notebook de code source est automatiquement créé et configuré pour le pipeline.

Le bloc-notes est créé dans un répertoire dans votre répertoire utilisateur. Le nom du nouveau répertoire et du nouveau fichier correspondent au nom de votre pipeline. Par exemple : /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Un lien permettant d’accéder à ce bloc-notes se trouve sous le champ Code source dans le panneau détails du pipeline. Cliquez sur le lien pour ouvrir le bloc-notes avant de passer à l’étape suivante.

Étape 2 : Déclarer des vues matérialisées et des tables de streaming dans un notebook avec Python ou SQL

Vous pouvez utiliser des notebooks Datbricks pour développer et valider de manière interactive le code source pour les pipelines Delta Live Tables. Vous devez attacher votre notebook au pipeline pour utiliser cette fonctionnalité. Pour attacher votre bloc-notes nouvellement créé au pipeline que vous venez de créer :

  1. Cliquez sur Se connecter en haut à droite pour ouvrir le menu de configuration du calcul.
  2. Pointez sur le nom du pipeline que vous avez créé à l’étape 1.
  3. Cliquez sur Connecter.

L’interface utilisateur change pour inclure les boutons Valider et Démarrer en haut à droite. Pour en savoir plus sur la prise en charge des notebooks pour le développement de code de pipeline, consultez Développer et déboguer des pipelines Delta Live Tables dans les notebooks.

Important

  • Les pipelines Delta Live Tables évaluent toutes les cellules d’un notebook pendant la planification. Contrairement aux notebooks exécutés sur un calcul à usage unique ou planifiés en tant que travaux, les pipelines ne garantissent pas que les cellules s’exécutent dans l’ordre spécifié.
  • Les notebooks ne peuvent contenir qu’un seul langage de programmation. Ne mélangez pas le code Python et SQL dans les notebooks de code source de pipeline.

Pour plus d’informations sur le développement de code avec Python ou SQL, consultez Développer du code de pipeline avec Python ou Développer du code de pipeline avec SQL.

Exemple de code de pipeline

Pour implémenter l’exemple dans ce tutoriel, copiez et collez le code suivant dans une cellule du notebook configuré comme code source pour votre pipeline.

Le code fourni effectue les opérations suivantes :

  • Importe les modules nécessaires (Python uniquement).
  • Référence les paramètres définis lors de la configuration du pipeline.
  • Définit une table de diffusion en continu nommée baby_names_raw qui ingère à partir d’un volume.
  • Définit une vue matérialisée nommée baby_names_prepared qui valide les données ingérées.
  • Définit une vue matérialisée nommée top_baby_names_2021 qui a une vue hautement affinée des données.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Étape 3 : Démarrer une mise à jour de pipeline

Pour démarrer une mise à jour du pipeline, cliquez sur le bouton Démarrer en haut à droite de l’interface utilisateur du bloc-notes.

Exemples de notebooks

Les blocs-notes suivants contiennent les mêmes exemples de code fournis dans cet article. Ces notebooks ont les mêmes exigences que les étapes décrites dans cet article. Consultez Spécifications.

Pour importer un bloc-notes, procédez comme suit :

  1. Ouvrez l’interface utilisateur du bloc-notes.
    • Cliquez sur + Nouveau>bloc-notes.
    • Un bloc-notes vide s’ouvre.
  2. Cliquez sur Fichier>Importer. La boîte de dialogue Importer s’affiche.
  3. Sélectionnez l’option URL à partir de laquelle Importer.
  4. Collez l’URL du bloc-notes.
  5. Cliquez sur Importer.

Ce tutoriel nécessite l’exécution d’un notebook de configuration des données avant de configurer et d’exécuter votre pipeline Delta Live Tables. Importez le bloc-notes suivant, attachez le bloc-notes à une ressource de calcul, renseignez la variable requise pour my_catalog, my_schemaet my_volumecliquez sur Exécuter tout.

Téléchargement des données pour le didacticiel sur les pipelines

Obtenir le notebook

Les notebooks suivants fournissent des exemples dans Python ou SQL. Lorsque vous importez un bloc-notes, il est enregistré dans votre répertoire d’accueil utilisateur.

Après avoir importé l’un des blocs-notes ci-dessous, effectuez les étapes de création d’un pipeline, mais utilisez le sélecteur de fichiers de code source pour sélectionner le bloc-notes téléchargé. Après avoir créé le pipeline avec un notebook configuré comme code source, cliquez sur Démarrer dans l’interface utilisateur du pipeline pour déclencher une mise à jour.

Notebook Python de prise en main de Delta Live Tables

Obtenir le notebook

Notebook SQL de prise en main de Delta Live Tables

Obtenir le notebook

Exemple de notebooks de code source pour les espaces de travail sans Unity Catalog

Remarque

Si votre espace de travail n’est pas activé pour Unity Catalog, les notebooks avec des exemples qui ne nécessitent pas Unity Catalog sont attachés à cet article. Pour utiliser ces exemples, sélectionnez Hive metastore comme option de stockage lorsque vous créez le pipeline.

Vous pouvez importer ces notebooks dans un espace de travail Azure Databricks sans Unity Catalog activé et les utiliser pour déployer un pipeline Delta Live Tables. Voir l’étape 1 : Créer un pipeline.

Notebook Python de prise en main de Delta Live Tables

Obtenir le notebook

Notebook SQL de prise en main de Delta Live Tables

Obtenir le notebook