Exécuter votre première charge de travail Structured Streaming

Cet article fournit des exemples de code et une explication des concepts de base nécessaires pour exécuter vos premières requêtes Structured Streaming sur Azure Databricks. Vous pouvez utiliser Structured Streaming pour des charges de travail de traitement en quasi-temps réel et incrémentiel.

Structured Streaming est l’une des technologies qui alimentent les tables de diffusion en continu dans Delta Live Tables. Databricks recommande d’utiliser Delta Live Tables pour toutes les nouvelles charges de travail d’ETL, d’ingestion et Structured Streaming. Consultez l’article Qu’est-ce que Delta Live Tables ?.

Remarque

Alors que Delta Live Tables fournit une syntaxe légèrement modifiée pour déclarer des tables de diffusion en continu, la syntaxe générale pour la configuration des lectures et des transformations de diffusion en continu s’applique à tous les cas d’usage de diffusion en continu sur Azure Databricks. Delta Live Tables simplifie également la diffusion en continu en gérant les informations d’état, les métadonnées et de nombreuses configurations.

Utiliser Auto Loader pour lire les données de diffusion en continu à partir du stockage d’objets

L’exemple suivant illustre le chargement de données JSON avec le Auto Loader, qui utilise cloudFiles pour indiquer le format et les options. L’option schemaLocation active l’inférence et l’évolution du schéma. Collez le code suivant dans une cellule de notebook Databricks et exécutez la cellule pour créer un DataFrame de diffusion en continu nommé raw_df :

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Comme d’autres opérations de lecture sur Azure Databricks, la configuration d’une lecture de diffusion en continu ne charge pas réellement les données. Vous devez déclencher une action sur les données avant le début du flux.

Remarque

L’appel de display() sur un DataFrame de diffusion en continu démarre une tâche de diffusion en continu. Pour la plupart des cas d’usage Structured Streaming, l’action qui déclenche un flux doit écrire des données dans un récepteur. Consultez Considérations relatives à la production pour flux structuré.

Effectuer une transformation de diffusion en continu

Structured Streaming prend en charge la plupart des transformations disponibles dans Azure Databricks et Spark SQL. Vous pouvez même charger des modèles MLflow en tant que fonctions définies par l’utilisateur et effectuer des prédictions de diffusion en continu en tant que transformation.

L’exemple de code suivant effectue une transformation simple pour enrichir les données JSON ingérées avec des informations supplémentaires à l’aide des fonctions Spark SQL :

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Le transformed_df résultant contient des instructions de requête pour charger et transformer chaque enregistrement à mesure qu’il arrive dans la source de données.

Remarque

Structured Streaming traite les sources de données comme des jeux de données illimités ou infinis. Par conséquent, certaines transformations ne sont pas prises en charge dans les charges de travail Structured Streaming, car elles nécessitent le tri d’un nombre infini d’éléments.

La plupart des agrégations et de nombreuses jointures nécessitent la gestion des informations d’état avec des filigranes, des fenêtres et le mode de sortie. Consultez Appliquer des filigranes pour contrôler les seuils de traitement des données.

Effectuer une écriture par lot incrémentielle dans Delta Lake

L’exemple suivant écrit dans Delta Lake à l’aide d’un chemin d’accès et d’un point de contrôle de fichier spécifiés.

Important

Veillez toujours à spécifier un emplacement de point de contrôle unique pour chaque enregistreur de diffusion en continu que vous configurez. Le point de contrôle fournit l’identité unique de votre flux, en suivant tous les enregistrements traités et les informations d’état associés à votre requête de diffusion en continu.

Le paramètre availableNow du déclencheur indique à Structured Streaming de traiter tous les enregistrements précédemment non traités à partir du jeu de données source, puis d’arrêter, afin de pouvoir exécuter en toute sécurité le code suivant sans vous soucier de quitter un flux en cours d’exécution :

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Dans cet exemple, aucun nouvel enregistrement n’arrive dans notre source de données. Par conséquent, la répétition de l’exécution de ce code n’ingère pas de nouveaux enregistrements.

Avertissement

L’exécution de Structured Streaming peut empêcher l’arrêt automatique causé par l’arrêt des ressources de calcul. Pour éviter des coûts inattendus, veillez à arrêter les requêtes de diffusion en continu.

Lire des données à partir de Delta Lake, transformer et écrire dans Delta Lake

Delta Lake offre une prise en charge étendue de l’utilisation de Structured Streaming à la fois en tant que source et récepteur. Voir Lectures et écritures en diffusion en continu sur des tables Delta.

L’exemple suivant illustre l’exemple de syntaxe permettant de charger de manière incrémentielle tous les nouveaux enregistrements d’une table Delta, de les joindre à un instantané d’une autre table Delta et de les écrire dans une table Delta :

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Vous devez disposer des autorisations appropriées configurées pour lire les tables sources et écrire dans les tables cibles et l’emplacement de point de contrôle spécifié. Renseignez tous les paramètres indiqués avec les signes inférieur et supérieur (<>) à l’aide des valeurs pertinentes pour vos sources de données et récepteurs.

Remarque

Delta Live Tables fournit une syntaxe entièrement déclarative pour créer des pipelines Delta Lake et gère automatiquement des propriétés telles que des déclencheurs et des points de contrôle. Consultez l’article Qu’est-ce que Delta Live Tables ?.

Lire des données à partir de Kafka, transformer et écrire dans Kafka

Apache Kafka et d’autres bus de messagerie fournissent une latence parmi les plus faibles disponibles pour les jeux de données volumineux. Vous pouvez utiliser Azure Databricks pour appliquer des transformations aux données ingérées à partir de Kafka, puis réécrire des données dans Kafka.

Remarque

L’écriture de données dans le stockage d’objets cloud ajoute une surcharge de latence supplémentaire. Si vous souhaitez stocker des données à partir d’un bus de messagerie dans Delta Lake, mais que vous avez besoin de la latence la plus faible possible pour les charges de travail de diffusion en continu, Databricks recommande de configurer des travaux de diffusion en continu distincts pour ingérer des données dans le lakehouse et d’appliquer des transformations en quasi-temps réel pour les récepteurs de bus de messagerie en aval.

L’exemple de code suivant illustre un modèle simple pour enrichir des données à partir de Kafka en les joignant à des données dans une table Delta, puis réécrire dans Kafka :

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Vous devez disposer des autorisations appropriées configurées pour l’accès à votre service Kafka. Renseignez tous les paramètres indiqués avec les signes inférieur et supérieur (<>) à l’aide des valeurs pertinentes pour vos sources de données et récepteurs. Consultez Traitement de flux avec Apache Kafka et Azure Databricks.