Hub eventi di Azure

Hub eventi di Azure è un servizio di inserimento di dati di telemetria su vastissima scala che raccoglie, trasforma e archivia milioni di eventi. In quanto piattaforma di streaming distribuita, offre bassa latenza e tempo di conservazione configurabile, permettendo all'utente di inserire quantità molto elevate di dati di telemetria nel cloud e leggere i dati da più applicazioni usando una semantica di pubblicazione-sottoscrizione.

Questo articolo illustra come usare Structured Streaming con cluster Hub eventi di Azure e Azure Databricks.

Nota

Hub eventi di Azure fornisce un endpoint compatibile con Apache Kafka che è possibile usare con Connettore Kafka structured Streaming, disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Databricks consiglia di usare il connettore Kafka Structured Streaming per elaborare i messaggi da Hub eventi di Azure.

Requisiti

Per il supporto della versione corrente, vedere "Versioni più recenti" nel file leggimi del progetto connettore Spark Hub eventi di Azure.

  1. Creare una libreria nell'area di lavoro di Azure Databricks usando la coordinata Maven com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    Nota

    Questo connettore viene aggiornato regolarmente, potrebbe essere disponibile una versione più recente: si consiglia di eseguire il pull del connettore più recente dal repository Maven

  2. Installare la libreria creata nel cluster.

Schema

Lo schema dei record è:

Column Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body viene sempre fornito come matrice di byte. Utilizzare cast("string") per deserializzare in modo esplicito la colonna body.

Impostazione

Questa sezione illustra le impostazioni di configurazione necessarie per lavorare con Hub eventi.

Per indicazioni dettagliate sulla configurazione di Structured Streaming con Hub eventi di Azure, vedere Structured Streaming and Hub eventi di Azure Integration Guide sviluppato da Microsoft.

Per indicazioni dettagliate sull'uso di Structured Streaming, vedere Streaming in Azure Databricks.

Stringa di connessione

Per connettersi al servizio Hub eventi, è necessario un stringa di connessione di Hub eventi. È possibile ottenere il stringa di connessione per l'istanza di Hub eventi dal portale di Azure o usando ConnectionStringBuilder nella libreria.

Portale di Azure

Quando si ottiene il stringa di connessione dal portale di Azure, potrebbe avere o meno la chiave EntityPath. Tenere in considerazione:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Per connettersi a EventHubs, è necessario che sia presente un oggetto EntityPath. Se la stringa di connessione non ne ha uno, non preoccuparti. Questo se ne occuperà:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

In alternativa, è possibile usare ConnectionStringBuilder per creare la stringa di connessione.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Tutte le configurazioni relative a Hub eventi si verificano in EventHubsConf. Per creare un oggetto EventHubsConf, è necessario passare un stringa di connessione:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Per altre informazioni su come ottenere una stringa di connessione valida, vedere Stringhe di connessione.

Per un elenco completo delle configurazioni, vedere EventHubsConf. Ecco un subset di configurazioni per iniziare:

Opzione Valore Predefiniti Tipo di query Descrizione
consumerGroup Stringa “$Default” Streaming e batch Un gruppo di consumer è una vista di un intero hub eventi. I gruppi di consumer consentono a più applicazioni costose di avere una visualizzazione separata del flusso di eventi e di leggere il flusso in modo indipendente in base alle proprie esigenze e con i propri gli offset. Altre informazioni sono disponibili nella documentazione di Microsoft.
startingPosition EventPosition Avviare lo streaming Streaming e batch Posizione iniziale per il processo Structured Streaming. Per informazioni sull'ordine in cui vengono lette le opzioni, vedere startingPositions .
maxEventsPerTrigger long partitionCount

1000-
Query di streaming Limite di frequenza per il numero massimo di eventi elaborati per intervallo di trigger. Il numero totale di eventi specificato verrà suddiviso proporzionalmente tra partizioni di volumi diversi.

Per ogni opzione esiste un'impostazione corrispondente in EventHubsConf. Ad esempio:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf consente agli utenti di specificare posizioni iniziale (e finale) con la classe EventPosition. EventPosition definisce la posizione di un evento in una partizione di Hub eventi. La posizione può essere un tempo accodato, un offset, un numero di sequenza, l'inizio del flusso o la fine del flusso.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Se si vuole iniziare (o terminare) in una posizione specifica, è sufficiente creare il codice corretto EventPosition e impostarlo in EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)