Azure Service Bus Clientbibliothek für Java– Version 7.14.4
Microsoft Azure Service Bus ist ein vollständig verwalteter Nachrichtenbroker für die Unternehmensintegration. Mit Service Bus lassen sich Anwendungen und Dienste entkoppeln. Service Bus bietet eine zuverlässige und sichere Plattform für die asynchrone Übertragung von Daten und Zuständen. Die Datenübertragung zwischen verschiedenen Anwendungen und Diensten erfolgt mithilfe von Nachrichten. Wenn Sie mehr über Azure Service Bus erfahren möchten, lesen Sie vielleicht: Was ist Service Bus?
Die Azure Service Bus-Clientbibliothek ermöglicht das Senden und Empfangen von Azure Service Bus Nachrichten und kann für folgende Zwecke verwendet werden:
- Übertragung von Geschäftsdaten (beispielsweise Verkäufe/Bestellungen, Journale oder Bestandsbewegungen)
- Entkoppeln Sie Anwendungen, um die Zuverlässigkeit und Skalierbarkeit von Anwendungen und Diensten zu verbessern. Clients und Dienste müssen nicht gleichzeitig online sein.
- Ermöglichung von 1:n-Beziehungen zwischen Herausgebern und Abonnenten
- Implementierung von Workflows, die die Sortierung oder Verzögerung von Nachrichten erfordern
Quellcode | API-Referenzdokumentation | Produktdokumentation | Proben | Paket (Maven)
Erste Schritte
Voraussetzungen
- Java Development Kit (JDK) mit Version 8 oder höher
- Maven
- Microsoft Azure-Abonnement
- Sie können ein kostenloses Konto erstellen unter: https://azure.microsoft.com
- Azure Service Bus instance
- Schritt-für-Schritt-Anleitung zum Erstellen eines Service Bus-instance im Azure-Portal
Um schnell die erforderlichen Service Bus-Ressourcen in Azure zu erstellen und eine Verbindungszeichenfolge für sie zu erhalten, können Sie unsere Beispielvorlage bereitstellen, indem Sie auf Folgendes klicken:
Einschließen des Pakets
BOM-Datei einfügen
Fügen Sie azure-sdk-bom in Ihr Projekt ein, um von der Allgemeinverfügbarkeitsversion der Bibliothek abhängig zu sein. Ersetzen Sie im folgenden Codeausschnitt den Platzhalter {bom_version_to_target} durch die Versionsnummer. Weitere Informationen zur BOM finden Sie in der INFODATEI FÜR AZURE SDK-STÜCKLISTEN.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
und fügen Sie dann die direkte Abhängigkeit ohne Versions-Tag in den Abschnitt „Abhängigkeit“ ein.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
</dependency>
</dependencies>
Direkte Abhängigkeiten einfügen
Wenn Sie eine Abhängigkeit von einer bestimmten Version der Bibliothek annehmen möchten, die nicht in der BoM vorhanden ist, fügen Sie die direkte Abhängigkeit wie folgt zu Ihrem Projekt hinzu.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.14.4</version>
</dependency>
Authentifizieren des Clients
Damit die Service Bus-Clientbibliothek mit Service Bus interagieren kann, muss sie verstehen, wie eine Verbindung hergestellt und autorisiert werden kann.
Erstellen von Service Bus-Clients mithilfe einer Verbindungszeichenfolge
Die einfachste Möglichkeit für die Authentifizierung ist die Verwendung einer Verbindungszeichenfolge, die automatisch erstellt wird, wenn ein Service Bus-Namespace erstellt wird. Wenn Sie mit SAS-Richtlinien in Azure nicht vertraut sind, können Sie die schrittweise Anleitung befolgen, um eine Service Bus-Verbindungszeichenfolge zu erhalten.
Sowohl die asynchronen als auch die synchronen Service Bus-Absender- und -Empfängerclients werden mit ServiceBusClientBuilder
instanziiert. Die folgenden Codeausschnitte erstellen einen synchronen Service Bus-Absender bzw. einen asynchronen Empfänger.
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.receiver()
.topicName("<< TOPIC NAME >>")
.subscriptionName("<< SUBSCRIPTION NAME >>")
.buildAsyncClient();
Erstellen eines Service Bus-Clients mithilfe von Microsoft Identity Platform (früher Azure Active Directory)
Das Azure SDK für Java unterstützt das Azure Identity-Paket, sodass Anmeldeinformationen aus dem Microsoft Identity Platform einfach abgerufen werden können. Fügen Sie zunächst das Paket hinzu:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.10.1</version>
</dependency>
- Bekanntes Problem: Die pom.xml-Datei sollte vor
azure-identity
Clientbibliotheken aufgeführtazure-messaging-servicebus
werden. Dieses Problem wurde mitazure-identity:1.2.1
behoben. Ausführlichere Informationen finden Sie hier.
Die implementierten Methoden zum Anfordern von Anmeldeinformationen befinden sich unter dem com.azure.identity.credential
Paket. Das folgende Beispiel zeigt, wie Sie einen geheimen Clientschlüssel der Azure Active Directory-Anwendung (AAD) verwenden, um mit Azure Service Bus zu autorisieren.
Autorisieren mit DefaultAzureCredential
Die Autorisierung ist am einfachsten mit DefaultAzureCredential. Es findet die besten Anmeldeinformationen, die in seiner ausgeführten Umgebung verwendet werden können. Weitere Informationen zur Verwendung der Azure Active Directory-Autorisierung mit Service Bus finden Sie in der zugehörigen Dokumentation.
Verwenden Sie die zurückgegebenen Tokenanmeldeinformationen, um den Client zu authentifizieren:
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.credential("<<fully-qualified-namespace>>", credential)
.receiver()
.queueName("<<queue-name>>")
.buildAsyncClient();
Wichtige Begriffe
Sie können mit den primären Ressourcentypen in einem Service Bus-Namespace interagieren, von denen mehrere vorhanden sein können und auf denen die tatsächliche Nachrichtenübertragung erfolgt. Der Namespace dient häufig als Anwendungscontainer:
- Eine Warteschlange ermöglicht das Senden und Empfangen von Nachrichten, geordnet nach first-in-first-out. Es wird häufig für die Punkt-zu-Punkt-Kommunikation verwendet.
- Ein Thema eignet sich besser für Herausgeber- und Abonnentenszenarien. Ein Thema veröffentlicht Nachrichten für Abonnements, von denen mehrere gleichzeitig vorhanden sein können.
- Ein Abonnement empfängt Nachrichten aus einem Thema. Jedes Abonnement ist unabhängig und erhält eine Kopie der Nachricht, die an das Thema gesendet wird.
Service Bus-Clients
Der Generator ServiceBusClientBuilder
wird verwendet, um alle Service Bus-Clients zu erstellen.
ServiceBusSenderClient
Ein synchroner Absender, der für das SendenServiceBusMessage
an eine bestimmte Warteschlange oder ein Bestimmtes Thema in Azure Service Bus.ServiceBusSenderAsyncClient
Ein asynchroner Absender, der für das SendenServiceBusMessage
an eine bestimmte Warteschlange oder ein Bestimmtes Thema in Azure Service Bus.ServiceBusReceiverClient
Ein synchroner Empfänger, der für den EmpfangServiceBusMessage
von einer bestimmten Warteschlange oder einem bestimmten Thema in Azure Service Bus verantwortlich ist.ServiceBusReceiverAsyncClient
Ein asynchroner Empfänger, der für den EmpfangServiceBusMessage
von einer bestimmten Warteschlange oder einem bestimmten Thema auf Azure Service Bus zuständig ist.
Beispiele
- Senden von Nachrichten
- Empfangen von Nachrichten
- Senden und Empfangen von Sitzungsaktivierten Warteschlangen oder Themen
- Erstellen eines Empfängers der Warteschlange für unzustellbare Nachrichten
- Freigeben einer Verbindung zwischen Clients
Senden von Nachrichten
Sie müssen eine asynchrone ServiceBusSenderAsyncClient
oder eine synchrone ServiceBusSenderClient
erstellen, um Nachrichten zu senden. Jeder Absender kann Nachrichten entweder an eine Warteschlange oder ein Thema senden.
Der folgende Codeausschnitt erstellt eine synchrone ServiceBusSenderClient
Zum Veröffentlichen einer Nachricht in einer Warteschlange.
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
new ServiceBusMessage("Hello world").setMessageId("1"),
new ServiceBusMessage("Bonjour").setMessageId("2"));
sender.sendMessages(messages);
// When you are done using the sender, dispose of it.
sender.close();
Empfangen von Nachrichten
Um Nachrichten zu empfangen, müssen Sie eine ServiceBusProcessorClient
mit Rückrufen für eingehende Nachrichten und alle Fehler erstellen, die während des Prozesses auftreten. Anschließend können Sie den Client nach Bedarf starten und beenden.
Beim Empfang einer Nachricht mit dem PeekLock-Modus teilt er dem Broker mit, dass die Anwendungslogik empfangene Nachrichten explizit abgleichen möchte (z. B. abschließen, abbrechen).
// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (Exception completionError) {
System.out.printf("Completion of the message %s failed\n", message.getMessageId());
completionError.printStackTrace();
}
} else {
try {
context.abandon();
} catch (Exception abandonError) {
System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
abandonError.printStackTrace();
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.processor()
.queueName("<< QUEUE NAME >>")
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
Weist den Broker beim Empfang von Nachrichten im ReceiveAndDelete-Modus an, alle Nachrichten, die er an den empfangenden Client sendet, als beim Senden festgelegt zu betrachten.
// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.processor()
.queueName("<< QUEUE NAME >>")
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
Es gibt vier Möglichkeiten zum Abgleichen von Nachrichten mithilfe der Methoden für den Nachrichtenkontext, der an Ihren Rückruf übergeben wird.
- Abgeschlossen: Bewirkt, dass die Nachricht aus der Warteschlange oder dem Thema gelöscht wird.
- Abbrechen: Gibt die Sperre des Empfängers für die Nachricht auf, sodass die Nachricht von anderen Empfängern empfangen werden kann.
- Zurückstellen: Verschiebt den Empfang der Nachricht auf normalem Wege. Um verzögerte Nachrichten zu empfangen, muss die Sequenznummer der Nachricht beibehalten werden.
- Unzustellbarer Buchstabe: Verschiebt die Nachricht in die Warteschlange für unzustellbare Nachrichten. Dadurch wird verhindert, dass die Nachricht erneut empfangen wird. Um Nachrichten aus der Warteschlange für unzustellbare Nachrichten zu empfangen, wird ein Empfänger benötigt, der auf die Warteschlange für unzustellbare Nachrichten festgelegt ist.
Senden und Empfangen von Sitzungsaktivierten Warteschlangen oder Themen
Wenn Sie Sitzungen verwenden, müssen Sie eine sitzungsfähige Warteschlange oder ein Abonnement erstellen. Weitere Informationen dazu finden Sie unter "Nachrichtensitzungen".
Azure Service Bus-Sitzungen ermöglichen die gemeinsame und geordnete Verarbeitung unbegrenzter Sequenzen verwandter Nachrichten. Sitzungen können mit FIFO- (First in, First Out) und Anforderung/Antwort-Mustern verwendet werden. Jeder Absender kann beim Senden von Nachrichten an ein Thema oder eine Warteschlange eine Sitzung erstellen, indem er die ServiceBusMessage.setSessionId(String)
-Eigenschaft auf einen anwendungsdefinierten Bezeichner festlegt, der für die Sitzung eindeutig ist.
Im Gegensatz zu Warteschlangen oder Abonnements, die nicht sitzungsfähig sind, kann nur ein einzelner Empfänger jederzeit aus einer Sitzung lesen. Wenn ein Empfänger eine Sitzung abruft, sperrt Service Bus die Sitzung für diesen Empfänger und hat exklusiven Zugriff auf Nachrichten in dieser Sitzung.
Senden einer Nachricht an eine Sitzung
Erstellen Sie eine für eine sitzungsfähige Warteschlange oder ein ServiceBusSenderClient
Themenabonnement. Die Einstellung ServiceBusMessage.setSessionId(String)
für ein ServiceBusMessage
veröffentlicht die Nachricht in dieser Sitzung. Wenn die Sitzung nicht vorhanden ist, wird sie erstellt.
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
sender.sendMessage(message);
Empfangen von Nachrichten aus einer Sitzung
Der Empfang von Nachrichten von Sitzungen ähnelt dem Empfangen von Nachrichten aus einer nicht sitzungsfähigen Warteschlange oder einem Abonnement. Der Unterschied besteht im Generator und der von Ihnen verwendeten Klasse.
In Nichtsitzungsfällen würden Sie den Unter-Generator processor()
verwenden. Bei Sitzungen würden Sie den Unter-Generator sessionProcessor()
verwenden. Beide Unterbuilder erstellen eine instance von, die ServiceBusProcessorClient
für die Arbeit an einer Sitzung oder einer Service Bus-Entität ohne Sitzung konfiguriert ist. Im Fall des Sitzungsprozessors können Sie auch die maximale Anzahl von Sitzungen übergeben, die der Prozessor gleichzeitig verarbeiten soll.
Erstellen einer Unzustellbare Warteschlange Empfänger
Azure Service Bus Warteschlangen und Themenabonnements stellen eine sekundäre Unterwarteschlange bereit, die als Dead-Letter-Warteschlange (DLQ) bezeichnet wird. Die Warteschlange für unzustellbare Nachrichten muss nicht explizit erstellt und kann nicht gelöscht oder anderweitig unabhängig von der Hauptentität verwaltet werden. Für Abonnements mit aktivierter Sitzungs- oder Nicht-Sitzungswarteschlange oder Themen kann der Empfänger für unzustellbare Nachrichten auf die gleiche Weise wie unten dargestellt erstellt werden. Weitere Informationen zur Warteschlange für unzustellbare Nachrichten finden Sie hier.
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.receiver() // Use this for session or non-session enabled queue or topic/subscriptions
.topicName("<< TOPIC NAME >>")
.subscriptionName("<< SUBSCRIPTION NAME >>")
.subQueue(SubQueue.DEAD_LETTER_QUEUE)
.buildClient();
Gemeinsame Nutzung der Verbindung zwischen Clients
Für das Erstellen einer physischen Verbindung mit Service Bus sind Ressourcen erforderlich. Eine Anwendung sollte die Verbindung gemeinsam nutzen.
zwischen Clients, die durch gemeinsame Nutzung des Generators der obersten Ebene erreicht werden können, wie unten gezeigt.
// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
.receiver()
.queueName("<< QUEUE NAME >>")
.buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
Wann sollte "ServiceBusProcessorClient" verwendet werden.
Wann sollte "ServiceBusProcessorClient", "ServiceBusReceiverClient" oder ServiceBusReceiverAsyncClient verwendet werden? Der Prozessor wird mit "ServiceBusReceiverAsyncClient" erstellt und bietet eine bequeme Möglichkeit zum Empfangen von Nachrichten mit standardmäßig automatischer Vervollständigung und automatischer Verlängerung von Nachrichtensperren im "PEEK_LOCK"-Modus. Der Prozessor ist geeignet, wenn die Anwendungen nicht vollständig auf den asynchronen Empfängerclient umgestellt haben und Nachrichten im synchronen Modus verarbeiten möchten.
Der Prozessor empfängt ewig Nachrichten, da er intern von den Netzwerkfehlern wiederhergestellt wird.
Der Funktionsaufruf "ServiceBusProcessorClient:processMessage()" wird für jede Nachricht ausgeführt. Alternativ können Sie auch "ServiceBusReceiverClient" verwenden, da es sich um einen Client auf niedrigerer Ebene handelt und eine größere Palette von APIs bereitstellt. Wenn die asynchrone Verarbeitung
für Ihre Anwendung geeignet, können Sie "ServiceBusReceiverAsyncClient" verwenden.
Problembehandlung
Aktivieren der Clientprotokollierung
Das Azure SDK für Java bietet eine konsistente Protokollierung, die bei der Problembehandlung von Anwendungsfehlern und deren Lösung hilft. Die erstellten Protokolle erfassen den Flow einer Anwendung, bevor sie den Endzustand erreichen. Dies trägt zur Ermittlung der Grundursache bei. Informationen zum Aktivieren der Protokollierung finden Sie im Protokollierungswiki.
Aktivieren der AMQP-Transportprotokollierung
Wenn die Aktivierung der Clientprotokollierung nicht ausreicht, um Ihre Probleme zu diagnostizieren. Sie können die Protokollierung für eine Datei in der zugrunde liegenden AMQP-Bibliothek Qpid Proton-J aktivieren. Qpid Proton-J verwendet java.util.logging
. Sie können die Protokollierung aktivieren, indem Sie eine Konfigurationsdatei mit dem folgenden Inhalt erstellen. Oder legen Sie proton.trace.level=ALL
fest, welche Konfigurationsoptionen Sie für die java.util.logging.Handler
Implementierung benötigen. Implementierungsklassen und ihre Optionen finden Sie im Java 8 SDK javadoc.
Um die AMQP-Transportframes zu verfolgen, legen Sie die Umgebungsvariable fest: PN_TRACE_FRM=1
.
Beispieldatei "logging.properties"
Die folgende Konfigurationsdatei protokolliert die Ablaufverfolgungsausgabe von proton-j in die Datei "proton-trace.log".
handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n
Allgemeine Ausnahmen
AMQP-Ausnahme
Dies ist eine allgemeine Ausnahme für AMQP-bezogene Fehler. Dies umfasst die AMQP-Fehler als ErrorCondition
und den Kontext, der diese Ausnahme als AmqpErrorContext
verursacht hat. isTransient
ist ein boolescher Wert, der angibt, ob es sich bei der Ausnahme um einen vorübergehenden Fehler handelt oder nicht. Wenn eine vorübergehende AMQP-Ausnahme auftritt, wiederholt die Clientbibliothek den Vorgang so oft, wie amqpRetryOptions dies zulässt. Nachher schlägt der Vorgang fehl, und eine Ausnahme wird zurück an den Benutzer weitergegeben.
AmqpErrorCondition
enthält Fehlerbedingungen, die für das AMQP-Protokoll gelten und von Azure-Diensten verwendet werden. Wenn eine AMQP-Ausnahme ausgelöst wird, kann das Untersuchen des Fehlerbedingungsfelds Entwickler darüber informieren, warum die AMQP-Ausnahme aufgetreten ist und wenn möglich, wie diese Ausnahme entschärft werden kann. Eine Liste aller AMQP-Ausnahmen finden Sie unter OASIS AMQP Version 1.0 Transportfehler.
Die empfohlene Methode zum Beheben der spezifischen Ausnahme, die die AMQP-Ausnahme darstellt, besteht darin, den Leitfaden für Service Bus Messaging-Ausnahmen zu befolgen.
Grundlegendes zum APIs-Verhalten
Das vorliegende Dokument bietet Einblicke in das erwartete Verhalten der synchronen receiveMessages
API, wenn sie zum Abrufen von mehr als einer Nachricht verwendet wird (auch implizites Prefetching).
Nächste Schritte
Darüber hinaus bietet die Azure Service Bus Clientbibliothek Unterstützung für viele weitere Szenarien, um den vollständigen Featuresatz des Azure Service Bus-Diensts zu nutzen. Um einige dieser Szenarien zu erkunden, ist der folgende Satz von Beispielen hier verfügbar.
Mitwirken
Wenn Sie ein aktiver Mitwirkender zu diesem Projekt werden möchten, lesen Sie bitte unsere Beitragsrichtlinien für weitere Informationen.