Azure Event Hubs Clientbibliothek für Java – Version 5.16.0

Azure Event Hubs ist ein hochgradig skalierbarer Publish-Subscribe-Dienst, der Millionen von Ereignissen pro Sekunde erfassen und an mehrere Verbraucher streamen kann. Auf diese Weise können Sie die enormen Datenmengen verarbeiten und analysieren, die von Ihren verbundenen Geräten und Anwendungen erzeugt werden. Sobald Event Hubs die Daten gesammelt hat, können Sie sie mithilfe eines beliebigen Echtzeitanalyseanbieters oder mit Batching-/Speicheradaptern abrufen, transformieren und speichern. Wenn Sie mehr über Azure Event Hubs erfahren möchten, sollten Sie folgendes lesen: Was ist Event Hubs?

Die Azure Event Hubs-Clientbibliothek ermöglicht die Veröffentlichung und Nutzung von Azure Event Hubs-Ereignissen und kann für Folgendes verwendet werden:

  • Geben Sie Telemetriedaten zu Ihrer Anwendung für Business Intelligence- und Diagnose-Zwecke aus.
  • Veröffentlichen Sie Fakten zum Status Ihrer Anwendung, die von interessierten Parteien beobachtet und als Auslöser für die Ausführung von Aktionen verwendet werden können.
  • Beobachten Sie interessante Vorgänge und Interaktionen innerhalb Ihres Unternehmens oder anderen Ökosystems, sodass lose gekoppelte Systeme interagieren können, ohne sie miteinander verbinden zu müssen.
  • Empfangen Sie Ereignisse von einem oder mehreren Herausgebern, transformieren Sie sie, um die Anforderungen Ihres Ökosystems besser zu erfüllen, und veröffentlichen Sie die transformierten Ereignisse dann in einem neuen Stream, den Consumer beobachten können.

Quellcode | API-Referenzdokumentation | Produktdokumentation | Proben | Problembehandlung

Inhaltsverzeichnis

Erste Schritte

Voraussetzungen

Einschließen des Pakets

BOM-Datei einfügen

Fügen Sie das azure-sdk-bom in Ihr Projekt ein, um die Abhängigkeit von der General Availability (GA)-Version der Bibliothek zu übernehmen. Ersetzen Sie im folgenden Codeausschnitt den Platzhalter {bom_version_to_target} durch die Versionsnummer. Weitere Informationen zur Stückliste finden Sie in der AZURE SDK-BOM-INFODATEI.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

und fügen Sie dann die direkte Abhängigkeit in den Abschnitt abhängigkeiten ohne das Versionstag ein, wie unten gezeigt.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

Direkte Abhängigkeiten einfügen

Wenn Sie abhängigkeiten von einer bestimmten Version der Bibliothek übernehmen möchten, die in der Stückliste nicht vorhanden ist, fügen Sie die direkte Abhängigkeit wie folgt zu Ihrem Projekt hinzu.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

Authentifizieren des Clients

Damit die Event Hubs-Clientbibliothek mit einem Event Hub interagieren kann, muss sie verstehen, wie eine Verbindung hergestellt und autorisiert werden kann.

Erstellen eines Event Hub-Producers mithilfe einer Verbindungszeichenfolge

Die einfachste Möglichkeit hierfür ist die Verwendung einer Verbindungszeichenfolge, die beim Erstellen eines Event Hubs-Namespaces automatisch erstellt wird. Wenn Sie mit Shared Access-Richtlinien in Azure nicht vertraut sind, sollten Sie die schrittweise Anleitung befolgen, um eine Event Hubs-Verbindungszeichenfolge abzurufen.

Sowohl der asynchrone als auch der synchrone Event Hub-Producer- als auch der Consumerclient können mit EventHubClientBuildererstellt werden. Beim Aufrufen build*Client() wird ein synchroner Producer oder Consumer erstellt, während dessen build*AsyncClient() asynchrones Pendant erstellt wird.

Mit dem folgenden Codeausschnitt wird ein synchroner Event Hub-Producer erstellt.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Erstellen eines Event Hub-Clients mit Microsoft Identity Platform (früher Azure Active Directory)

Das Azure SDK für Java unterstützt ein Azure Identity-Paket, das das Abrufen von Anmeldeinformationen von Microsoft Identity Platform vereinfacht. Fügen Sie zunächst das Paket hinzu:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

Alle implementierten Möglichkeiten zum Anfordern von Anmeldeinformationen finden Sie unter dem com.azure.identity.credential Paket. Im folgenden Beispiel wird gezeigt, wie Sie einen geheimen Clientschlüssel der Azure Active Directory-Anwendung (AAD) verwenden, um sich mit Azure Event Hubs zu autorisieren.

Autorisieren mit DefaultAzureCredential

Die Autorisierung ist am einfachsten mit DefaultAzureCredential. Es findet die besten Anmeldeinformationen, die in seiner ausgeführten Umgebung verwendet werden können. Weitere Informationen zur Verwendung der Azure Active Directory-Autorisierung mit Event Hubs finden Sie in der zugehörigen Dokumentation.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Wichtige Begriffe

  • Ein Event Hub-Produzent ist eine Quelle von Telemetriedaten, Diagnose Informationen, Nutzungsprotokollen oder anderen Protokolldaten als Teil einer eingebetteten Gerätelösung, einer mobilen Geräteanwendung, eines Spieltitels, der auf einer Konsole oder einem anderen Gerät ausgeführt wird, einer client- oder serverbasierten Geschäftslösung oder einer Website.

  • Ein Event Hub-Consumer erfasst solche Informationen aus dem Event Hub und verarbeitet sie. Die Verarbeitung kann Aggregation, komplexe Berechnung und Filterung umfassen. Die Verarbeitung kann auch eine unformatierte oder transformierte Verteilung oder Speicherung der Informationen umfassen. Event Hub-Consumer sind häufig robuste und umfangreiche Komponenten der Plattforminfrastruktur mit integrierten Analysefunktionen wie Azure Stream Analytics, Apache Spark oder Apache Storm.

  • Eine Partition ist eine geordnete Sequenz Abfolge von Ereignissen, die in einem Event Hub erfolgt. Azure Event Hubs ermöglicht Nachrichtenstreaming über ein partitioniertes Consumermuster, bei dem jeder Consumer nur eine bestimmte Teilmenge oder Partition des Nachrichtendatenstroms liest. Neu eingehende Ereignisse werden am Ende dieser Sequenz hinzugefügt. Die Anzahl der Partitionen wird zum Zeitpunkt der Erstellung eines Event Hubs angegeben und kann nicht geändert werden.

  • Eine Consumergruppe ist eine Ansicht eines vollständigen Event Hubs. Consumergruppen ermöglichen es mehreren verarbeitenden Anwendungen, jeweils eine eigene Ansicht des Ereignisdatenstroms zu verwenden und den Datenstrom unabhängig voneinander in ihrem eigenen Tempo und von ihrer eigenen Position aus zu lesen. Pro Consumergruppe können höchstens fünf gleichzeitige Leser auf einer Partition vorhanden sein. Es wird jedoch empfohlen, nur einen aktiven Consumer für eine bestimmte Partitions- und Consumergruppenkopplung zu verwenden. Jeder aktive Leser empfängt die Ereignisse von seiner Partition. wenn mehrere Reader auf derselben Partition vorhanden sind, erhalten sie doppelte Ereignisse.

Weitere Konzepte und ausführlichere Diskussionen finden Sie unter Event Hubs-Features. Außerdem sind die Konzepte für AMQP in OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0 gut dokumentiert.

Beispiele

Veröffentlichen von Ereignissen für einen Event Hub

Zum Veröffentlichen von Ereignissen müssen Sie eine asynchrone EventHubProducerAsyncClient oder eine synchrone EventHubProducerClienterstellen. Jeder Producer kann Ereignisse an eine bestimmte Partition senden oder dem Event Hubs-Dienst erlauben, zu entscheiden, in welchen Partitionsereignissen veröffentlicht werden soll. Es wird empfohlen, automatisches Routing zu verwenden, wenn die Veröffentlichung von Ereignissen hochverfügbar sein muss oder wenn Ereignisdaten gleichmäßig auf die Partitionen verteilt werden sollen.

Erstellen eines Event Hub-Producers und Veröffentlichen von Ereignissen

Entwickler können einen Producer erstellen, indem sie EventHubClientBuilder verwenden und aufrufen buildProducer*Client(). Wenn Sie angeben, CreateBatchOptions.setPartitionId(String) werden Ereignisse an eine bestimmte Partition gesendet. Wenn partitionId nicht angegeben wird, werden Ereignisse automatisch an eine Partition weitergeleitet. Wenn Sie angeben, CreateBatchOptions.setPartitionKey(String) wird der Event Hubs-Dienst aufgefordert, die Ereignisse zu hashen und an dieselbe Partition zu senden.

Der folgende Codeausschnitt erstellt einen synchronen Producer und sendet Ereignisse an eine beliebige Partition, sodass der Event Hubs-Dienst das Ereignis an eine verfügbare Partition weiterleiten kann.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

Beachten Sie, dass EventDataBatch.tryAdd(EventData) nicht threadsicher ist. Stellen Sie sicher, dass Sie den Methodenzugriff synchronisieren, wenn Sie mehrere Threads zum Hinzufügen von Ereignissen verwenden.

Veröffentlichen von Ereignissen mithilfe des Partitionsbezeichners

Viele Event Hub-Vorgänge finden im Bereichs einer bestimmten Partition statt. Jeder Client kann oder getEventHubProperties() aufrufengetPartitionIds(), um die Partitions-IDs und Metadaten in seiner Event Hub-instance abzurufen.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

Veröffentlichen von Ereignissen mithilfe des Partitionsschlüssels

Wenn eine Gruppe von Ereignissen keiner bestimmten Partition zugeordnet ist, kann es wünschenswert sein, anzufordern, dass der Event Hubs-Dienst verschiedene Ereignisse oder Ereignisbatches zusammen auf derselben Partition behält. Dies kann erreicht werden, indem beim Veröffentlichen der Ereignisse ein partition key festgelegt wird. Im folgenden Szenario beziehen sich alle Ereignisse auf Städte, sodass sie gesendet werden, wobei der Partitionsschlüssel auf "cities" festgelegt ist.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

Nutzen von Ereignissen aus einer Event Hub-Partition

Um Ereignisse zu nutzen, erstellen Sie ein EventHubConsumerAsyncClient oder EventHubConsumerClient für eine bestimmte Consumergruppe. Darüber hinaus muss ein Consumer angeben, wo im Ereignisstream mit dem Empfang von Ereignissen begonnen werden soll.

Nutzen von Ereignissen mit EventHubConsumerAsyncClient

Im folgenden Codeausschnitt erstellen wir einen asynchronen Consumer, der Ereignisse von partitionId empfängt und nur auf die neuesten Ereignisse lauscht, die in die Partition gepusht werden. Entwickler können mit dem Empfangen von Ereignissen von mehreren Partitionen beginnen, indem sie mit einer EventHubConsumerAsyncClient anderen Partitions-ID aufrufen receiveFromPartition(String, EventPosition) .

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

Nutzen von Ereignissen mit EventHubConsumerClient

Entwickler können mithilfe von EventHubConsumerClienteinen synchronen Consumer erstellen, der Ereignisse in Batches zurückgibt. Im folgenden Codeausschnitt wird ein Consumer erstellt, der mit dem Lesen von Ereignissen am Anfang des Ereignisdatenstroms der Partition beginnt.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

Nutzen von Ereignissen mithilfe eines EventProcessorClient

Um Ereignisse für alle Partitionen eines Event Hubs zu nutzen, können Sie eine EventProcessorClient für eine bestimmte Consumergruppe erstellen.

Die EventProcessorClient delegieren die Verarbeitung von Ereignissen an eine von Ihnen bereitgestellte Rückruffunktion, sodass Sie sich auf die Logik konzentrieren können, die zum Bereitstellen von Werten erforderlich ist, während der Prozessor für die Verwaltung der zugrunde liegenden Consumervorgänge verantwortlich ist.

In unserem Beispiel konzentrieren wir uns auf das Erstellen von EventProcessorClient, verwenden die SampleCheckpointStore in Beispielen verfügbaren und eine Rückruffunktion, die vom Event Hub empfangene Ereignisse verarbeitet und in die Konsole schreibt. Für Produktionsanwendungen wird empfohlen, einen dauerhaften Speicher wie Checkpoint Store mit Azure Storage-Blobs zu verwenden.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

Problembehandlung

Siehe TROUBLESHOOTING.md.

Nächste Schritte

Neben den behandelten Bietet die Azure Event Hubs-Clientbibliothek Unterstützung für viele andere Szenarien, um den vollständigen Funktionsumfang des Azure Event Hubs-Diensts zu nutzen. Informationen zu einigen dieser Szenarien finden Sie in der Infodatei zu Beispielen.

Mitwirken

Wenn Sie ein aktiver Mitwirkender zu diesem Projekt werden möchten, lesen Sie bitte unsere Beitragsrichtlinien für weitere Informationen.

Aufrufe