Verwenden von Deltatabellen mit Streamingdaten
Bei allen Daten, die wir bisher erkundet haben, handelt es sich um statische Daten in Dateien. Viele Szenarien für die Datenanalyse beziehen jedoch Streamingdaten ein, die nahezu in Echtzeit verarbeitet werden müssen. Sie müssen beispielsweise Messwerte erfassen, die von IoT-Geräten (Internet of Things, Internet der Dinge) ausgegeben werden, und diese in einer Tabelle speichern, sobald sie auftreten. Spark verarbeitet Batch- und Streamingdaten auf die gleiche Weise, sodass Streamingdaten in Echtzeit mithilfe derselben API verarbeitet werden können.
Spark Structured Streaming
Eine typische Lösung für die Verarbeitung von Datenströmen sieht vor, dass Sie ständig einen Datenstrom aus einer Quelle lesen, ihn optional verarbeiten, um bestimmte Felder auszuwählen, Werte zu aggregieren und zu gruppieren oder die Daten anderweitig zu bearbeiten, und die Ergebnisse in eine Senke schreiben.
Spark bietet native Unterstützung für Streamingdaten mittels Spark Structured Streaming, einer API, die auf einem unbegrenzten DataFrame basiert, in dem Streamingdaten zur Verarbeitung erfasst werden. Ein Spark Structured Streaming-DataFrame kann Daten aus vielen verschiedenen Typen von Streamingquellen lesen, z. B.:
- Netzwerkports
- Echtzeit-Nachrichtenbrokerdienste wie Azure Event Hubs oder Kafka
- Dateisystem-Speicherorte
Tipp
Weitere Informationen zu Spark Structured Streaming finden Sie in der Spark-Dokumentation im Structured Streaming Programming Guide.
Streaming mit Delta-Tabellen
Sie können eine Delta-Tabelle als Quelle oder Senke für Spark Structured Streaming nutzen. Sie können z. B. einen Datenstrom mit Echtzeitdaten von einem IoT-Gerät erfassen und direkt in eine Delta-Tabelle als Senke schreiben. Anschließend können Sie die Tabelle abfragen, um die zuletzt gestreamten Daten anzuzeigen. Alternativ können Sie eine Delta-Tabelle als Streamingquelle lesen, sodass Berichte in Quasi-Echtzeit möglich werden, wenn der Tabelle neue Daten hinzugefügt werden.
Verwenden einer Delta-Tabelle als Streamingquelle
Im folgenden PySpark-Beispiel wird eine Delta-Tabelle erstellt, um Details zu Onlinebestellungen zu speichern:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
Ein hypothetischer Datenstrom von Onlinebestellungen wird in die Tabelle „orders_in“ eingefügt:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
Um dies zu überprüfen, können Sie Daten aus der Eingabetabelle lesen und anzeigen:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
Die Daten werden dann aus der Delta-Tabelle in einen Streaming-DataFrame geladen:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Hinweis
Wenn Sie eine Delta-Tabelle als Streamingquelle verwenden, können nur Anfügevorgänge in den Datenstrom einbezogen werden. Datenänderungen verursachen einen Fehler, es sei denn, Sie geben die Option ignoreChanges
oder ignoreDeletes
an.
Sie können überprüfen, ob Daten gestreamt werden, indem Sie die isStreaming
-Eigenschaft verwenden, die TRUE zurückgeben sollte:
# Verify that the stream is streaming
stream_df.isStreaming
Transformieren des Datenstroms
Nachdem Sie die Daten aus der Delta-Tabelle in einen Streaming-DataFrame gelesen haben, können Sie sie mit der Spark Structured Streaming-API verarbeiten. Sie können beispielsweise die Anzahl der Bestellungen zählen, die jede Minute getätigt wurden, und die aggregierten Ergebnisse an einen nachgelagerten Prozess für die Visualisierung in Quasi-Echtzeit senden.
In diesem Beispiel werden alle Zeilen mit NULL in der Spalte „Price“ gefiltert und neue Spalten für „IsBike“ und „Total“ hinzugefügt.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Verwenden einer Delta-Tabelle als Streamingsenke
Der Datenstrom wird dann in eine Delta-Tabelle geschrieben:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Hinweis
Die Option checkpointLocation
dient zum Schreiben einer Prüfpunktdatei, die den Status der Datenstromverarbeitung verfolgt. Diese Datei ermöglicht Ihnen, nach einem Ausfall an der Stelle fortzufahren, an der die Datenstromverarbeitung unterbrochen wurde.
Nachdem der Streamingprozess gestartet wurde, können Sie die Delta Lake-Tabelle abfragen, um den Inhalt der Ausgabetabelle anzuzeigen. Es kann eine kurze Verzögerung geben, bevor Sie die Tabelle abfragen können.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
In den Ergebnissen dieser Abfrage wird „Order 3005“ ausgeschlossen, da sie in der Spalte „Price“ den Wert NULL enthält. Auch die beiden Spalten, die während der Transformation hinzugefügt wurden, werden angezeigt: „IsBike“ und „Total“.
OrderID | OrderDate | Kreditor | Produkt | Menge | Preis | IsBike | Gesamt |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | Road Bike Red | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | Mountain Bike Silver | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Road Bike Yellow | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | Road Front Wheel | 1 | 115 | 0 | 115 |
Beenden Sie, wenn Sie fertig sind, das Streaming der Daten mithilfe der stop
-Methode, um unnötige Verarbeitungskosten zu vermeiden:
# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()
Tipp
Weitere Informationen zum Verwenden von Delta-Tabellen für das Streamen von Daten finden Sie in der Delta Lake-Dokumentation unter Lese- und Schreibvorgänge beim Streaming aus Tabellen.