Azure Event Hubs
Azure Event Hubs ist ein hyperskalierbarer Dienst für die Erfassung von Telemetriedaten, der Millionen von Ereignissen sammelt, transformiert und speichert. Diese verteilte Streamingplattform bietet niedrige Latenz und konfigurierbare Aufbewahrungszeiten, wodurch Sie riesige Mengen an Telemetriedaten in die Cloud einspeisen können. Über die Semantik „Veröffentlichen/Abonnieren“ können Sie die Daten verschiedener Anwendungen lesen.
In diesem Artikel wird erläutert, wie Sie strukturiertes Streaming mit Azure Event Hubs und Azure Databricks-Clustern verwenden.
Hinweis
Azure Event Hubs stellt einen mit Apache Kafka kompatiblen Endpunkt bereit, den Sie mit dem in Databricks Runtime verfügbaren Kafka-Connector für strukturiertes Streaming verwenden können, um Nachrichten von Azure Event Hubs zu verarbeiten. Databricks empfiehlt die Verwendung des Kafka-Connectors für strukturiertes Streaming, um Nachrichten von Azure Event Hubs zu verarbeiten.
Anforderungen
Informationen zur Unterstützung der aktuellen Version finden Sie unter „Neueste Versionen“ in der Infodatei zum Azure Event Hubs-Spark-Connector.
Erstellen Sie eine Bibliothek in Ihrem Azure Databricks-Arbeitsbereich mithilfe der Maven-Koordinate
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Hinweis
Dieser Connector wird regelmäßig aktualisiert, und unter Umständen ist eine aktuellere Version verfügbar. Wir empfehlen Ihnen, den aktuellen Connector aus dem Maven-Repository zu pullen.
Installieren Sie die erstellte Bibliothek in Ihrem Cluster.
Schema
Das Schema der Datensätze ist wie folgt:
Column | Type |
---|---|
body |
binary |
partition |
Zeichenfolge |
offset |
Zeichenfolge |
sequenceNumber |
lang |
enqueuedTime |
timestamp |
publisher |
Zeichenfolge |
partitionKey |
Zeichenfolge |
properties |
map[string,json] |
body
wird immer als Bytearray bereitgestellt. Verwenden Sie cast("string")
, um die Spalte body
explizit zu deserialisieren.
Konfiguration
In diesem Abschnitt werden die Konfigurationseinstellungen erläutert, die Sie für das Arbeiten mit Event Hubs benötigen.
Ausführliche Anleitungen zum Konfigurieren von strukturiertem Streaming mit Azure Event Hubs finden Sie in dem von Microsoft entwickelten Integrationsleitfaden für strukturiertes Streaming und Azure Event Hubs.
Eine ausführliche Anleitung zur Verwendung von strukturiertem Streaming finden Sie unter Streaming in Azure Databricks.
Verbindungszeichenfolge
Eine Event Hubs-Verbindungszeichenfolge ist erforderlich, um eine Verbindung mit dem Event Hubs-Dienst herzustellen. Sie können die Verbindungszeichenfolge für Ihre Event Hubs-Instanz über das Azure-Portal oder mithilfe von ConnectionStringBuilder
in der Bibliothek abrufen.
Azure-Portal
Wenn Sie die Verbindungszeichenfolge über das Azure-Portal abrufen, kann sie den EntityPath
-Schlüssel enthalten oder auch nicht. Berücksichtigen Sie dabei Folgendes:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Um eine Verbindung mit Ihrer Event Hubs-Instanz herzustellen, muss ein EntityPath
vorhanden sein. Wenn dieser nicht in Ihrer Verbindungszeichenfolge enthalten ist, machen Sie sich keine Sorgen.
Dieses Problem wird hiermit behoben:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Alternativ können Sie ConnectionStringBuilder
verwenden, um die Verbindungszeichenfolge zu erstellen.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Alle Konfigurationen im Zusammenhang mit Event Hubs erfolgen in EventHubsConf
. Zum Erstellen von EventHubsConf
müssen Sie eine Verbindungszeichenfolge übergeben:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Weitere Informationen zum Abrufen einer gültigen Verbindungszeichenfolge finden Sie unter Verbindungszeichenfolge.
Eine vollständige Liste der Konfigurationen finden Sie unter EventHubsConf. Nachfolgend ist ein Teil der Konfigurationen für den Einstieg angegeben:
Option | Wert | Standard | Abfragetyp | BESCHREIBUNG |
---|---|---|---|---|
consumerGroup |
String | „$Default“ | Streaming und Batch | Eine Consumergruppe ist eine Ansicht eines vollständigen Event Hubs. Mithilfe von Consumergruppen können mehrere verarbeitende Anwendungen jeweils eine separate Ansicht des Ereignisstreams aufweisen und den Stream unabhängig voneinander in einem unabhängigen Tempo und mit eigenen Offsets lesen. Weitere Informationen finden Sie in der Microsoft-Dokumentation. |
startingPosition |
EventPosition | Anfang des Streams | Streaming und Batch | Die Anfangsposition für Ihren Auftrag für strukturiertes Streaming. Informationen zur Reihenfolge, in der Optionen gelesen werden, finden Sie unter startingPositions. |
maxEventsPerTrigger |
lang | partitionCount * 1000 |
Streamingabfrage | Ratenbegrenzung für die maximale Anzahl von Ereignissen, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtanzahl von Ereignissen wird proportional auf Partitionen mit unterschiedlichen Volumes aufgeteilt. |
Für jede Option gibt es eine entsprechende Einstellung in EventHubsConf
. Beispiele:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
ermöglicht Benutzern das Angeben der Anfangsposition (und Endposition) mit der EventPosition
-Klasse. EventPosition
definiert die Position eines Ereignisses in einer Event Hub-Partition. Die Position kann ein Zeitpunkt der Einreihung in die Warteschlange, ein Offset, eine Sequenznummer, der Anfang des Streams oder das Ende des Streams sein.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Wenn Sie an einer bestimmten Position beginnen (oder enden) möchten, erstellen Sie einfach die richtige EventPosition
, und legen Sie sie in EventHubsConf
fest:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)