Verwenden von „foreachBatch” zum Schreiben in beliebige Datensenken

In diesem Artikel wird die Verwendung von foreachBatch mit strukturiertem Streaming erläutert, um die Ausgabe einer Streamingabfrage in Datenquellen zu schreiben, die über keine vorhandene Streamingsenke verfügen.

Mit dem Codemuster streamingDF.writeStream.foreachBatch(...) können Sie Batchfunktionen auf die Ausgabedaten der einzelnen Mikrobatches der Streamingabfrage anwenden. Funktionen, die mit foreachBatch verwendet werden, akzeptieren zwei Parameter:

  • Ein DataFrame, der die Ausgabedaten eines Mikrobatches enthält.
  • Die eindeutige ID des Mikrobatches.

Sie müssen für Delta Lake-Zusammenführungsvorgänge bei strukturiertem Streaming foreachBatch verwenden. Weitere Informationen finden Sie unter Upsert aus Streamingabfragen mittels foreachBatch.

Anwenden zusätzlicher DataFrame-Vorgänge

Viele DataFrame- und Dataset-Vorgänge werden in Streaming-DataFrames nicht unterstützt, da Spark die Erstellung inkrementeller Pläne in diesen Fällen nicht unterstützt. Mit foreachBatch() können Sie einige dieser Vorgänge auf jede Mikrobatchausgabe anwenden. Sie können zum Beispiel foreachBath() und den SQL-Vorgang MERGE INTO verwenden, um die Ausgabe von Streamingaggregationen in eine Deltatabelle im Aktualisierungsmodus zu schreiben. Weitere Informationen finden Sie unter MERGE INTO.

Wichtig

  • foreachBatch() bietet nur At-Least-Once-Schreibgarantien. Sie können jedoch das der Funktion zur Verfügung gestellte batchId verwenden, um die Ausgabe zu deduplizieren und eine Exactly-Once-Garantie zu erhalten. In beiden Fällen müssen Sie selbst über die End-to-End-Semantik entscheiden.
  • foreachBatch() funktioniert nicht mit dem kontinuierlichen Verarbeitungsmodus, da er im Wesentlichen auf der Mikrobatchausführung einer Streamingabfrage beruht. Wenn Sie Daten im kontinuierlichen Modus schreiben, verwenden Sie stattdessen foreach().

Ein leerer Datenrahmen kann mit foreachBatch() aufgerufen werden, und der Benutzercode muss resilient sein, um einen ordnungsgemäßen Betrieb zu ermöglichen. Das folgende Beispiel soll dies erläutern:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Verhaltensänderungen bei foreachBatch in der Databricks Runtime 14.0

In Databricks Runtime 14.0 und höher bei der Berechnung, die mit dem Modus für gemeinsam genutzten Zugriff konfiguriert ist, gelten die folgenden Verhaltensänderungen:

  • print()-Befehle schreiben ihre Ausgabe in die Treiberprotokolle.
  • Sie können nicht auf das dbutils.widgets Untermodul innerhalb der Funktion zugreifen.
  • Alle Dateien, Module oder Objekte, auf die in der Funktion verwiesen wird, müssen serialisierbar und in Spark verfügbar sein.

Wiederverwenden vorhandener Batchdatenquellen

Mit foreachBatch() können Sie vorhandene Batchdaten-Writer für Datensenken verwenden, die möglicherweise keine Unterstützung für strukturiertes Streaming bieten. Hier sind einige Beispiele:

Viele andere Batchdatenquellen können von foreachBatch() aus verwendet werden. Weitere Informationen finden Sie unter Herstellen von Verbindungen mit Datenquellen.

Schreiben an mehrere Speicherorte

Wenn Sie die Ausgabe einer Streamingabfrage an mehrere Speicherorte schreiben müssen, empfiehlt Databricks die Verwendung mehrerer Writer für strukturiertes Streaming, um eine optimale Parallelisierung und einen optimalen Durchsatz zu erzielen.

Die Verwendung foreachBatch zum Schreiben in mehrere Senken serialisiert die Ausführung von Streamingschreibvorgängen, was die Wartezeit für jeden Mikrobatch erhöhen kann.

Wenn Sie zum Schreiben in mehrere Deltatabellen foreachBatch verwenden, lesen Sie die Informationen unter Idempotente Schreibvorgänge in Tabellen in foreachBatch.