Führen Sie Ihrem ersten strukturierten Streaming-Workload aus

Dieser Artikel enthält Codebeispiele und Erläuterungen grundlegender Konzepte, die zum Ausführen Ihrer ersten strukturierten Streaming-Abfragen auf Azure Databricks erforderlich sind. Sie können strukturiertes Streaming für nahezu Echtzeit- und inkrementelle Verarbeitungsworkloads verwenden.

Strukturiertes Streaming ist eine von mehreren Technologien, die Streamingtabellen in Delta Live-Tabellen nutzen. Databricks empfiehlt die Verwendung von Delta Live Tables für alle neuen ETL-, Aufnahme- und strukturierten Streaming-Workloads. Weitere Informationen finden Sie unter Was sind Delta Live-Tabellen?.

Hinweis

Während Delta Live Tables eine leicht geänderte Syntax zum Deklarieren von Streamingtabellen bereitstellt, gilt die allgemeine Syntax zum Konfigurieren von Streaminglesevorgängen und Transformationen für alle Streaming-Anwendungsfälle in Azure Databricks. Delta Live Tables vereinfacht auch das Streaming, indem Zustandsinformationen, Metadaten und zahlreiche Konfigurationen verwaltet werden.

Verwenden des automatischen Ladens zum Lesen von Streamingdaten aus dem Objektspeicher

Im folgenden Beispiel wird das Laden von JSON-Daten mit Auto Loader veranschaulicht, das zum Kennzeichnen von Format und Optionen cloudFiles verwendet. Die schemaLocation-Option ermöglicht Schemarückschluss und -evolution. Fügen Sie den folgenden Code in eine Zelle des Databricks-Notebooks ein und führen Sie die Zelle aus, um einen Streaming-DataFrame mit dem Namen raw_df zu erstellen:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Wie bei anderen Lesevorgängen in Azure Databricks werden beim Konfigurieren von Streaminglesedaten keine Daten geladen. Sie müssen eine Aktion für die Daten auslösen, bevor der Datenstrom beginnt.

Hinweis

Das Aufrufen von display() für einen Streaming-DataFrame startet einen Streamingauftrag. Bei den meisten Anwendungsfällen für strukturiertes Streaming sollte die Aktion, die einen Datenstrom auslöst, die Daten in eine Senke schreiben. Weitere Informationen finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.

Durchführen einer Streamingtransformation

Strukturiertes Streaming unterstützt die meisten Transformationen, die in Azure Databricks und Spark SQL verfügbar sind. Sie können sogar MLflow-Modelle als UDFs laden und Streamingvorhersagen als Transformation erstellen.

Im folgenden Codebeispiel wird eine einfache Transformation abgeschlossen, um die aufgenommenen JSON-Daten mit zusätzlichen Informationen mithilfe von Spark SQL-Funktionen zu erweitern:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Das resultierende transformed_df enthält Abfrageanweisungen zum Laden und Transformieren jedes Datensatzes, sobald er in der Datenquelle eingeht.

Hinweis

Strukturiertes Streaming behandelt Datenquellen als ungebundene oder unendliche Datasets. Daher werden einige Transformationen in strukturierten Streaming-Workloads nicht unterstützt, da sie eine unendliche Anzahl von Elementen sortieren müssen.

Die meisten Aggregationen und viele Verknüpfungen erfordern die Verwaltung von Zustandsinformationen mit Wasserzeichen, Fenstern und Ausgabemodus. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.

Durchführen eines inkrementellen Batchschreibvorgangs in Delta Lake

Im folgenden Beispiel wird mithilfe eines angegebenen Dateipfads und Prüfpunkts in Delta Lake geschrieben.

Wichtig

Stellen Sie immer sicher, dass Sie für jeden von Ihnen konfigurierten Streaming-Writer einen eindeutigen Prüfpunktspeicherort angeben. Der Prüfpunkt stellt die eindeutige Identität für Ihren Datenstrom bereit, wobei alle verarbeiteten Datensätze und Statusinformationen nachverfolgt werden, die Ihrer Streamingabfrage zugeordnet sind.

Die availableNow-Einstellung für den Trigger weist strukturiertes Streaming an, alle zuvor unverarbeiteten Datensätze aus dem Quelldatensatz zu verarbeiten und dann herunterzufahren, sodass Sie den folgenden Code sicher ausführen können, ohne sich Gedanken über das Verlassen eines Datenstroms machen zu müssen:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In diesem Beispiel werden keine neuen Datensätze in unserer Datenquelle eintreffen, sodass die Ausführung dieses Codes keine neuen Datensätze erfasst.

Warnung

Die Ausführung von strukturiertem Streaming kann die automatische Beendigung vom Herunterfahren von Computeressourcen verhindern. Um unerwartete Kosten zu vermeiden, müssen Sie Streamingabfragen beenden.

Lesen von Daten aus Delta Lake, Transformation und Schreiben in Delta Lake

Delta Lake verfügt über umfangreiche Unterstützung für die Arbeit mit strukturiertem Streaming sowohl als Quelle als auch als Senke. Weitere Informationen finden Sie unter Delta-Tabelle: Streaming für Lese- und Schreibvorgänge.

Das folgende Beispiel zeigt eine Beispielsyntax, um alle neuen Datensätze aus einer Delta-Tabelle inkrementell zu laden, sie mit einer Momentaufnahme einer anderen Delta-Tabelle zu verbinden und in eine Delta-Tabelle zu schreiben:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Sie müssen über die erforderlichen Berechtigungen verfügen, um Quelltabellen zu lesen und in Zieltabellen und den angegebenen Prüfpunktspeicherort zu schreiben. Füllen Sie alle Parameter aus, die mit gewinkelten Klammern (<>) gekennzeichnet sind, indem Sie die relevanten Werte für Ihre Datenquellen und Senken verwenden.

Hinweis

Delta Live Tables bietet eine vollständig deklarative Syntax zum Erstellen von Delta Lake-Pipelines, und verwaltet Eigenschaften wie Trigger und Prüfpunkte automatisch. Weitere Informationen finden Sie unter Was sind Delta Live-Tabellen?.

Lesen von Daten aus Kafka, Transformieren und Schreiben in Kafka

Apache Kafka und andere Nachrichtenbusse bieten mit die niedrigste Latenz, die für große Datasets verfügbar ist. Sie können Azure Databricks verwenden, um Transformationen auf Daten anzuwenden, die aus Kafka aufgenommen wurden, und dann Daten zurück in Kafka schreiben.

Hinweis

Das Schreiben von Daten in den Cloudobjektspeicher erhöht zusätzlichen Latenzaufwand. Wenn Sie Daten aus einem Nachrichtenbus in Delta Lake speichern möchten, aber die niedrigste Latenz für Streaming-Workloads erfordern, empfiehlt Databricks, separate Streamingaufträge für das Aufnehmen von Daten in das Lakehouse zu konfigurieren und nahezu Echtzeittransformationen für Downstream-Nachrichtenbussenken anzuwenden.

Das folgende Codebeispiel veranschaulicht ein einfaches Muster zum Anreichern von Daten aus Kafka durch Verknüpfen mit Daten in einer Delta-Tabelle und anschließendes Zurückschreiben in Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Sie müssen über die erforderlichen Berechtigungen für den Zugriff auf Ihren Kafka-Dienst verfügen. Füllen Sie alle Parameter aus, die mit gewinkelten Klammern (<>) gekennzeichnet sind, indem Sie die relevanten Werte für Ihre Datenquellen und Senken verwenden. Siehe Stream-Verarbeitung mit Apache Kafka und Azure Databricks.