Verwenden von „foreachBatch” zum Schreiben in beliebige Datensenken
In diesem Artikel wird die Verwendung von foreachBatch
mit strukturiertem Streaming erläutert, um die Ausgabe einer Streamingabfrage in Datenquellen zu schreiben, die über keine vorhandene Streamingsenke verfügen.
Mit dem Codemuster streamingDF.writeStream.foreachBatch(...)
können Sie Batchfunktionen auf die Ausgabedaten der einzelnen Mikrobatches der Streamingabfrage anwenden. Funktionen, die mit foreachBatch
verwendet werden, akzeptieren zwei Parameter:
- Ein DataFrame, der die Ausgabedaten eines Mikrobatches enthält.
- Die eindeutige ID des Mikrobatches.
Sie müssen für Delta Lake-Zusammenführungsvorgänge bei strukturiertem Streaming foreachBatch
verwenden. Weitere Informationen finden Sie unter Upsert aus Streamingabfragen mittels foreachBatch.
Anwenden zusätzlicher DataFrame-Vorgänge
Viele DataFrame- und Dataset-Vorgänge werden in Streaming-DataFrames nicht unterstützt, da Spark die Erstellung inkrementeller Pläne in diesen Fällen nicht unterstützt. Mit foreachBatch()
können Sie einige dieser Vorgänge auf jede Mikrobatchausgabe anwenden. Sie können zum Beispiel foreachBath()
und den SQL-Vorgang MERGE INTO
verwenden, um die Ausgabe von Streamingaggregationen in eine Deltatabelle im Aktualisierungsmodus zu schreiben. Weitere Informationen finden Sie unter MERGE INTO.
Wichtig
foreachBatch()
bietet nur At-Least-Once-Schreibgarantien. Sie können jedoch das der Funktion zur Verfügung gestelltebatchId
verwenden, um die Ausgabe zu deduplizieren und eine Exactly-Once-Garantie zu erhalten. In beiden Fällen müssen Sie selbst über die End-to-End-Semantik entscheiden.foreachBatch()
funktioniert nicht mit dem kontinuierlichen Verarbeitungsmodus, da er im Wesentlichen auf der Mikrobatchausführung einer Streamingabfrage beruht. Wenn Sie Daten im kontinuierlichen Modus schreiben, verwenden Sie stattdessenforeach()
.
Ein leerer Datenrahmen kann mit foreachBatch()
aufgerufen werden, und der Benutzercode muss resilient sein, um einen ordnungsgemäßen Betrieb zu ermöglichen. Das folgende Beispiel soll dies erläutern:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Verhaltensänderungen bei foreachBatch
in der Databricks Runtime 14.0
In Databricks Runtime 14.0 und höher bei der Berechnung, die mit dem Modus für gemeinsam genutzten Zugriff konfiguriert ist, gelten die folgenden Verhaltensänderungen:
print()
-Befehle schreiben ihre Ausgabe in die Treiberprotokolle.- Sie können nicht auf das
dbutils.widgets
Untermodul innerhalb der Funktion zugreifen. - Alle Dateien, Module oder Objekte, auf die in der Funktion verwiesen wird, müssen serialisierbar und in Spark verfügbar sein.
Wiederverwenden vorhandener Batchdatenquellen
Mit foreachBatch()
können Sie vorhandene Batchdaten-Writer für Datensenken verwenden, die möglicherweise keine Unterstützung für strukturiertes Streaming bieten. Hier sind einige Beispiele:
Viele andere Batchdatenquellen können von foreachBatch()
aus verwendet werden. Weitere Informationen finden Sie unter Herstellen von Verbindungen mit Datenquellen.
Schreiben an mehrere Speicherorte
Wenn Sie die Ausgabe einer Streamingabfrage an mehrere Speicherorte schreiben müssen, empfiehlt Databricks die Verwendung mehrerer Writer für strukturiertes Streaming, um eine optimale Parallelisierung und einen optimalen Durchsatz zu erzielen.
Die Verwendung foreachBatch
zum Schreiben in mehrere Senken serialisiert die Ausführung von Streamingschreibvorgängen, was die Wartezeit für jeden Mikrobatch erhöhen kann.
Wenn Sie zum Schreiben in mehrere Deltatabellen foreachBatch
verwenden, lesen Sie die Informationen unter Idempotente Schreibvorgänge in Tabellen in foreachBatch.