Exécuter votre première charge de travail ETL sur Azure Databricks

Découvrez comment utiliser les outils prêts pour la production d’Azure Databricks pour développer et déployer vos premiers pipelines d’extraction, de transformation et de chargement (ETL) pour l’orchestration des données.

À la fin de cet article, vous serez en mesure d’effectuer les opérations suivantes :

  1. Lancement d’un cluster de calcul à usage unique Databricks.
  2. Créer un notebook Databricks
  3. Configuration de l’ingestion incrémentielle de données vers Delta Lake avec Auto Loader.
  4. Exécuter des cellules de notebook pour traiter, interroger et prévisualiser des données
  5. Planifier un notebook en tant que travail Databricks

Ce tutoriel utilise des notebooks interactifs pour effectuer des tâches ETL courantes en Python ou Scala.

Vous pouvez également utiliser Delta Live Tables pour générer des pipelines ETL. Databricks a créé Delta Live Tables pour réduire la complexité de la création, du déploiement et de la maintenance des pipelines ETL de production. Consulter Tutoriel : Exécuter votre premier pipeline Delta Live Tables.

Vous pouvez également utiliser le fournisseur Databricks Terraform pour créer les ressources de cet article. Consultez Créer des clusters, des notebooks et des travaux avec Terraform.

Spécifications

Notes

Si vous ne disposez pas des privilèges de contrôle de cluster, vous pouvez quand même effectuer la plupart des étapes ci-dessous tant que vous avez accès à un cluster.

Étape 1 : Créer un cluster

Pour effectuer des analyses exploratoires des données et de l’engineering données, créez un cluster permettant de fournir les ressources de calcul nécessaires à l’exécution des commandes.

  1. Cliquez sur l’icône de calcul Calcul dans la barre latérale.
  2. Sur la page Calcul, cliquez sur Créer un cluster. La page Nouveau cluster s’ouvre.
  3. Spécifiez un nom unique pour le cluster, laissez les valeurs restantes dans leur état par défaut, puis cliquez sur Créer un cluster.

Pour en savoir plus sur les clusters Databricks, consultez Calculer.

Étape 2 : Créer un notebook Databricks

Pour créer un notebook dans votre espace de travail, cliquez sur l’icône Nouveau Nouveau dans la barre latérale, puis sur Notebook. Un notebook vide s’ouvre dans l’espace de travail.

Pour en savoir plus sur la création et la gestion des notebooks, consultez Gérer les notebooks.

Étape 3 : Configurer Auto Loader pour ingérer des données dans Delta Lake

Databricks recommande d’utiliser Auto Loader pour l’ingestion incrémentielle des données. Auto Loader détecte et traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent dans le stockage d’objets cloud.

Databricks recommande de stocker les données avec Delta Lake. Delta Lake est une couche de stockage open source qui fournit des transactions ACID et active le data lakehouse. Delta Lake est le format par défaut des tables créées dans Databricks.

Pour configurer Auto Loader afin d’ingérer des données dans une table Delta Lake, copiez et collez le code suivant dans la cellule vide de votre notebook :

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Notes

Les variables définies dans ce code doivent vous permettre de l’exécuter en toute sécurité sans risque de conflit avec les ressources existantes de l’espace de travail ou avec d’autres utilisateurs. Les autorisations de réseau ou de stockage restreintes génèrent des erreurs lors de l’exécution de ce code ; contactez l’administrateur de votre espace de travail pour remédier à ces restrictions.

Pour en savoir plus sur Auto Loader, consultez Qu’est-ce que Auto Loader ?

Étape 4 : Traiter les données et interagir avec elles

Les notebooks exécutent la logique cellule par cellule. Pour exécuter la logique dans votre cellule :

  1. Pour exécuter la cellule de l’étape précédente, sélectionnez-la, puis appuyez sur MAJ+ENTRÉE.

  2. Pour interroger la table que vous venez de créer, copiez et collez le code suivant dans une cellule vide, puis appuyez sur MAJ+ENTRÉE pour exécuter la cellule.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Pour prévisualiser les données dans votre DataFrame, copiez et collez le code suivant dans une cellule vide, puis appuyez sur MAJ+ENTRÉE pour exécuter la cellule.

    Python

    display(df)
    

    Scala

    display(df)
    

Pour en savoir plus sur les options interactives de visualisation des données, consultez Visualisations dans les notebooks Databricks.

Étape 5 : Planifier un travail

Vous pouvez exécuter des notebooks Databricks en tant que scripts de production en les ajoutant comme tâche dans un travail Databricks. Au cours de cette étape, vous allez créer un travail que vous pourrez déclencher manuellement.

Pour planifier votre notebook en tant que tâche :

  1. Cliquez sur Planifier sur le côté droit de la barre d’en-tête.
  2. Entrez un nom unique dans le champ Nom du travail.
  3. Cliquez sur Manuel.
  4. Dans la liste déroulante Cluster, sélectionnez le cluster que vous avez créé à l’étape 1.
  5. Cliquez sur Créer.
  6. Dans la fenêtre qui s’affiche, cliquez sur Exécuter maintenant.
  7. Pour afficher les résultats de l’exécution du travail, cliquez sur l’icône Lien externe en regard de l’horodatage Dernière exécution.

Pour plus d’informations sur les travaux, consultez Que sont les travaux Databricks ?.

Intégrations supplémentaires

Découvrez-en plus sur les intégrations et les outils d’engineering données avec Azure Databricks :