Verwenden von Deltatabellen mit Streamingdaten

Abgeschlossen

Bei allen Daten, die wir bisher erkundet haben, handelt es sich um statische Daten in Dateien. Viele Szenarien für die Datenanalyse beziehen jedoch Streamingdaten ein, die nahezu in Echtzeit verarbeitet werden müssen. Sie müssen beispielsweise Messwerte erfassen, die von IoT-Geräten (Internet of Things, Internet der Dinge) ausgegeben werden, und diese in einer Tabelle speichern, sobald sie auftreten. Spark verarbeitet Batch- und Streamingdaten auf die gleiche Weise, sodass Streamingdaten in Echtzeit mithilfe derselben API verarbeitet werden können.

Spark Structured Streaming

Eine typische Lösung für die Verarbeitung von Datenströmen sieht vor, dass Sie ständig einen Datenstrom aus einer Quelle lesen, ihn optional verarbeiten, um bestimmte Felder auszuwählen, Werte zu aggregieren und zu gruppieren oder die Daten anderweitig zu bearbeiten, und die Ergebnisse in eine Senke schreiben.

Spark bietet native Unterstützung für Streamingdaten mittels Spark Structured Streaming, einer API, die auf einem unbegrenzten DataFrame basiert, in dem Streamingdaten zur Verarbeitung erfasst werden. Ein Spark Structured Streaming-DataFrame kann Daten aus vielen verschiedenen Typen von Streamingquellen lesen, z. B.:

  • Netzwerkports
  • Echtzeit-Nachrichtenbrokerdienste wie Azure Event Hubs oder Kafka
  • Dateisystem-Speicherorte

Tipp

Weitere Informationen zu Spark Structured Streaming finden Sie in der Spark-Dokumentation im Structured Streaming Programming Guide.

Streaming mit Delta-Tabellen

Sie können eine Delta-Tabelle als Quelle oder Senke für Spark Structured Streaming nutzen. Sie können z. B. einen Datenstrom mit Echtzeitdaten von einem IoT-Gerät erfassen und direkt in eine Delta-Tabelle als Senke schreiben. Anschließend können Sie die Tabelle abfragen, um die zuletzt gestreamten Daten anzuzeigen. Alternativ können Sie eine Delta-Tabelle als Streamingquelle lesen, sodass Berichte in Quasi-Echtzeit möglich werden, wenn der Tabelle neue Daten hinzugefügt werden.

Verwenden einer Delta-Tabelle als Streamingquelle

Im folgenden PySpark-Beispiel wird eine Delta-Tabelle erstellt, um Details zu Onlinebestellungen zu speichern:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Ein hypothetischer Datenstrom von Onlinebestellungen wird in die Tabelle „orders_in“ eingefügt:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Um dies zu überprüfen, können Sie Daten aus der Eingabetabelle lesen und anzeigen:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Die Daten werden dann aus der Delta-Tabelle in einen Streaming-DataFrame geladen:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Hinweis

Wenn Sie eine Delta-Tabelle als Streamingquelle verwenden, können nur Anfügevorgänge in den Datenstrom einbezogen werden. Datenänderungen verursachen einen Fehler, es sei denn, Sie geben die Option ignoreChanges oder ignoreDeletes an.

Sie können überprüfen, ob Daten gestreamt werden, indem Sie die isStreaming-Eigenschaft verwenden, die TRUE zurückgeben sollte:

# Verify that the stream is streaming
stream_df.isStreaming

Transformieren des Datenstroms

Nachdem Sie die Daten aus der Delta-Tabelle in einen Streaming-DataFrame gelesen haben, können Sie sie mit der Spark Structured Streaming-API verarbeiten. Sie können beispielsweise die Anzahl der Bestellungen zählen, die jede Minute getätigt wurden, und die aggregierten Ergebnisse an einen nachgelagerten Prozess für die Visualisierung in Quasi-Echtzeit senden.

In diesem Beispiel werden alle Zeilen mit NULL in der Spalte „Price“ gefiltert und neue Spalten für „IsBike“ und „Total“ hinzugefügt.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Verwenden einer Delta-Tabelle als Streamingsenke

Der Datenstrom wird dann in eine Delta-Tabelle geschrieben:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Hinweis

Die Option checkpointLocation dient zum Schreiben einer Prüfpunktdatei, die den Status der Datenstromverarbeitung verfolgt. Diese Datei ermöglicht Ihnen, nach einem Ausfall an der Stelle fortzufahren, an der die Datenstromverarbeitung unterbrochen wurde.

Nachdem der Streamingprozess gestartet wurde, können Sie die Delta Lake-Tabelle abfragen, um den Inhalt der Ausgabetabelle anzuzeigen. Es kann eine kurze Verzögerung geben, bevor Sie die Tabelle abfragen können.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

In den Ergebnissen dieser Abfrage wird „Order 3005“ ausgeschlossen, da sie in der Spalte „Price“ den Wert NULL enthält. Auch die beiden Spalten, die während der Transformation hinzugefügt wurden, werden angezeigt: „IsBike“ und „Total“.

OrderID OrderDate Kreditor Produkt Menge Preis IsBike Gesamt
3001 2023-09-01 Yang Road Bike Red 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Silver 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Yellow 2 1350 1 2700
3004 2023-09-02 Yang Road Front Wheel 1 115 0 115

Beenden Sie, wenn Sie fertig sind, das Streaming der Daten mithilfe der stop-Methode, um unnötige Verarbeitungskosten zu vermeiden:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Tipp

Weitere Informationen zum Verwenden von Delta-Tabellen für das Streamen von Daten finden Sie in der Delta Lake-Dokumentation unter Lese- und Schreibvorgänge beim Streaming aus Tabellen.