Muster für strukturiertes Streaming in Azure Databricks

Dies beinhaltet Notebooks und Codebeispiele für gängige Muster für die Arbeit mit strukturiertem Streaming auf Azure Databricks.

Erste Schritte mit strukturiertem Streaming

Wenn Sie mit strukturiertem Streaming noch nicht vertraut sind, finden Sie weitere Informationen unter Ausführen Ihrer ersten Workload für strukturiertes Streaming.

Schreiben in Cassandra als Senke für strukturiertes Streaming in Python

Apache Cassandra ist eine verteilte, latenzarme, skalierbare und hochverfügbare OLTP-Datenbank.

Strukturiertes Streaming funktioniert mit Cassandra über den Spark Cassandra-Connector. Dieser Connector unterstützt sowohl RDD- als auch DataFrame-APIs und bietet native Unterstützung für das Schreiben von Streamingdaten. Wichtig Sie müssen die entsprechende Version von spark-cassandra-connector-assembly verwenden.

Im folgenden Beispiel wird eine Verbindung mit einem oder mehreren Hosts in einem Cassandra-Datenbankcluster hergestellt. Außerdem werden Verbindungskonfigurationen wie der Ort des Prüfpunkts und die spezifischen Keyspace- und Tabellennamen angegeben:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Schreiben in Azure Synapse Analytics mithilfe von foreachBatch() in Python

streamingDF.writeStream.foreachBatch() ermöglicht Ihnen die Wiederverwendung vorhandener Batchdatenschreiber, um die Ausgabe einer Streamingabfrage in Azure Synapse Analytics zu schreiben. Einzelheiten erfahren Sie in der Dokumentation zu foreachBatch.

Um dieses Beispiel auszuführen, benötigen Sie den Azure Synapse Analytics-Connector. Ausführliche Informationen zum Azure Synapse Analytics-Connector finden Sie unter Abfragen von Daten in Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream-Joins

Diese beiden Notebooks zeigen, wie Sie Stream-Stream-Joins in Python und Scala verwenden.

Stream-Stream-Joins in Python-Notebook

Notebook abrufen

Stream-Stream-Joins in Scala-Notebook

Notebook abrufen