Hub eventi di Azure
Hub eventi di Azure è un servizio di inserimento di dati di telemetria su vastissima scala che raccoglie, trasforma e archivia milioni di eventi. In quanto piattaforma di streaming distribuita, offre bassa latenza e tempo di conservazione configurabile, permettendo all'utente di inserire quantità molto elevate di dati di telemetria nel cloud e leggere i dati da più applicazioni usando una semantica di pubblicazione-sottoscrizione.
Questo articolo illustra come usare Structured Streaming con cluster Hub eventi di Azure e Azure Databricks.
Nota
Hub eventi di Azure fornisce un endpoint compatibile con Apache Kafka che è possibile usare con Connettore Kafka structured Streaming, disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Databricks consiglia di usare il connettore Kafka Structured Streaming per elaborare i messaggi da Hub eventi di Azure.
Requisiti
Per il supporto della versione corrente, vedere "Versioni più recenti" nel file leggimi del progetto connettore Spark Hub eventi di Azure.
Creare una libreria nell'area di lavoro di Azure Databricks usando la coordinata Maven
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Nota
Questo connettore viene aggiornato regolarmente, potrebbe essere disponibile una versione più recente: si consiglia di eseguire il pull del connettore più recente dal repository Maven
Installare la libreria creata nel cluster.
Schema
Lo schema dei record è:
Column | Type |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
body
viene sempre fornito come matrice di byte. Utilizzare cast("string")
per deserializzare in modo esplicito la colonna body
.
Impostazione
Questa sezione illustra le impostazioni di configurazione necessarie per lavorare con Hub eventi.
Per indicazioni dettagliate sulla configurazione di Structured Streaming con Hub eventi di Azure, vedere Structured Streaming and Hub eventi di Azure Integration Guide sviluppato da Microsoft.
Per indicazioni dettagliate sull'uso di Structured Streaming, vedere Streaming in Azure Databricks.
Stringa di connessione
Per connettersi al servizio Hub eventi, è necessario un stringa di connessione di Hub eventi. È possibile ottenere il stringa di connessione per l'istanza di Hub eventi dal portale di Azure o usando ConnectionStringBuilder
nella libreria.
Portale di Azure
Quando si ottiene il stringa di connessione dal portale di Azure, potrebbe avere o meno la chiave EntityPath
. Tenere in considerazione:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Per connettersi a EventHubs, è necessario che sia presente un oggetto EntityPath
. Se la stringa di connessione non ne ha uno, non preoccuparti.
Questo se ne occuperà:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
In alternativa, è possibile usare ConnectionStringBuilder
per creare la stringa di connessione.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Tutte le configurazioni relative a Hub eventi si verificano in EventHubsConf
. Per creare un oggetto EventHubsConf
, è necessario passare un stringa di connessione:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Per altre informazioni su come ottenere una stringa di connessione valida, vedere Stringhe di connessione.
Per un elenco completo delle configurazioni, vedere EventHubsConf. Ecco un subset di configurazioni per iniziare:
Opzione | Valore | Predefiniti | Tipo di query | Descrizione |
---|---|---|---|---|
consumerGroup |
Stringa | “$Default” | Streaming e batch | Un gruppo di consumer è una vista di un intero hub eventi. I gruppi di consumer consentono a più applicazioni costose di avere una visualizzazione separata del flusso di eventi e di leggere il flusso in modo indipendente in base alle proprie esigenze e con i propri gli offset. Altre informazioni sono disponibili nella documentazione di Microsoft. |
startingPosition |
EventPosition | Avviare lo streaming | Streaming e batch | Posizione iniziale per il processo Structured Streaming. Per informazioni sull'ordine in cui vengono lette le opzioni, vedere startingPositions . |
maxEventsPerTrigger |
long | partitionCount 1000- |
Query di streaming | Limite di frequenza per il numero massimo di eventi elaborati per intervallo di trigger. Il numero totale di eventi specificato verrà suddiviso proporzionalmente tra partizioni di volumi diversi. |
Per ogni opzione esiste un'impostazione corrispondente in EventHubsConf
. Ad esempio:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
consente agli utenti di specificare posizioni iniziale (e finale) con la classe EventPosition
. EventPosition
definisce la posizione di un evento in una partizione di Hub eventi. La posizione può essere un tempo accodato, un offset, un numero di sequenza, l'inizio del flusso o la fine del flusso.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Se si vuole iniziare (o terminare) in una posizione specifica, è sufficiente creare il codice corretto EventPosition
e impostarlo in EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)