ServiceBusReceiver Klasse
Die ServiceBusReceiver-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Nachrichten aus dem Azure Service Bus Queue oder Topic Subscription.
Die beiden primären Kanäle für den Nachrichtenempfang sind receive(), um eine einzelne Anforderung für Nachrichten zu stellen, und asynchron für Nachrichten im Empfänger: um eingehende Nachrichten kontinuierlich zu empfangen.
Verwenden Sie die get_<queue/subscription>_receiver
Methode von ~azure.servicebus.aio.ServiceBusClient, um eine ServiceBusReceiver-instance zu erstellen.
- Vererbung
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Konstruktor
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
Parameter
- fully_qualified_namespace
- str
Der vollqualifizierte Hostname für den Service Bus-Namespace. Das Namespaceformat ist . servicebus.windows.net.
- credential
- AsyncTokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren, oder alternativ kann auch ein AzureSasCredential-Objekt bereitgestellt werden.
- queue_name
- str
Der Pfad einer bestimmten Service Bus-Warteschlange, mit der der Client eine Verbindung herstellt.
- topic_name
- str
Der Pfad eines bestimmten Service Bus-Themas, das das Abonnement enthält, mit dem der Client eine Verbindung herstellt.
- subscription_name
- str
Der Pfad eines bestimmten Service Bus-Abonnements unter dem angegebenen Thema, mit dem der Client eine Verbindung herstellt.
- receive_mode
- Union[ServiceBusReceiveMode, str]
Der Modus, in dem Nachrichten von der Entität abgerufen werden. Die beiden Optionen sind PEEK_LOCK und RECEIVE_AND_DELETE. Nachrichten, die mit PEEK_LOCK empfangen werden, müssen innerhalb eines bestimmten Sperrzeitraums abgerechnet werden, bevor sie aus der Warteschlange entfernt werden. Nachrichten, die mit RECEIVE_AND_DELETE empfangen werden, werden sofort aus der Warteschlange entfernt und können anschließend nicht abgebrochen oder erneut empfangen werden, wenn der Client die Nachricht nicht verarbeitet. Der Standardmodus ist PEEK_LOCK.
Das Timeout in Sekunden zwischen empfangenen Nachrichten, nach dem der Empfänger automatisch den Empfang beendet. Der Standardwert ist None, d. h. kein Timeout.
- logging_enable
- bool
Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.
- transport_type
- TransportType
Der Typ des Transportprotokolls, das für die Kommunikation mit dem Service Bus-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp.
- http_proxy
- Dict
HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".
- user_agent
- str
Wenn angegeben, wird dies vor der integrierten Benutzer-Agent-Zeichenfolge hinzugefügt.
- auto_lock_renewer
- Optional[AutoLockRenewer]
Ein ~azure.servicebus.aio.AutoLockRenewer kann so bereitgestellt werden, dass Nachrichten beim Empfang automatisch registriert werden. Wenn der Empfänger ein Sitzungsempfänger ist, gilt er stattdessen für die Sitzung.
- prefetch_count
- int
Die maximale Anzahl von Nachrichten, die mit jeder Anforderung an den Dienst zwischengespeichert werden sollen. Diese Einstellung ist nur für die erweiterte Leistungsoptimierung vorgesehen. Wenn Sie diesen Wert erhöhen, wird die Leistung des Nachrichtendurchsatzes verbessert, aber die Gefahr erhöht, dass Nachrichten ablaufen, während sie zwischengespeichert werden, wenn sie nicht schnell genug verarbeitet werden. Der Standardwert ist 0, d. h. Nachrichten werden vom Dienst empfangen und nacheinander verarbeitet. Im Fall, dass prefetch_count 0 ist, versucht ServiceBusReceiver.receive , max_message_count (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern.
- client_identifier
- str
Ein Zeichenfolgenbasierter Bezeichner zur eindeutigen Identifizierung des Clients instance. Service Bus ordnet sie einigen Fehlermeldungen zu, um die Korrelation von Fehlern zu vereinfachen. Wenn nicht angegeben, wird eine eindeutige ID generiert.
- socket_timeout
- float
Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn Verbindungsfehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden.
Variablen
- fully_qualified_namespace
- str
Der vollqualifizierte Hostname für den Service Bus-Namespace. Das Namespaceformat ist .servicebus.windows.net.
- entity_path
- str
Der Pfad der Entität, mit der der Client eine Verbindung herstellt.
Methoden
abandon_message |
Verlassen Sie die Nachricht. Diese Nachricht wird an die Warteschlange zurückgegeben und für den Erneuten Empfang zur Verfügung gestellt. |
close | |
complete_message |
Vervollständigen Sie die Nachricht. Dadurch wird die Nachricht aus der Warteschlange entfernt. |
dead_letter_message |
Verschieben Sie die Nachricht in die Warteschlange für unzustellbare Nachrichten. Die Warteschlange für unzustellbare Nachrichten ist eine Untergeordnete Warteschlange, die zum Speichern von Nachrichten verwendet werden kann, die nicht ordnungsgemäß verarbeitet werden konnten oder anderweitig eine weitere Überprüfung oder Verarbeitung erfordern. Die Warteschlange kann auch so konfiguriert werden, dass abgelaufene Nachrichten an die Warteschlange für unzustellbare Nachrichten gesendet werden. |
defer_message |
Verschiebt die Nachricht. Diese Nachricht verbleibt in der Warteschlange, muss jedoch speziell von ihrer Sequenznummer angefordert werden, um empfangen zu werden. |
peek_messages |
Durchsuchen von Nachrichten, die derzeit in der Warteschlange ausstehen. Eingesehene Nachrichten werden weder aus der Warteschlange entfernt noch gesperrt. Sie können nicht abgeschlossen, verzögert oder unzustellbar sein. |
receive_deferred_messages |
Empfangen von Nachrichten, die zuvor verzögert wurden. Wenn Sie verzögerte Nachrichten von einer partitionierten Entität empfangen, müssen alle angegebenen Sequenznummern Nachrichten aus derselben Partition sein. |
receive_messages |
Empfangen sie einen Batch von Nachrichten gleichzeitig. Dieser Ansatz ist optimal, wenn Sie mehrere Nachrichten gleichzeitig verarbeiten oder einen Ad-hoc-Empfang als einzelnen Anruf ausführen möchten. Beachten Sie, dass die Anzahl der in einem einzelnen Batch abgerufenen Nachrichten davon abhängt, ob prefetch_count für den Empfänger festgelegt wurde. Wenn prefetch_count nicht für den Empfänger festgelegt ist, versucht der Empfänger, max_message_count Nachrichten (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern. Dieser Aufruf priorisiert die schnelle Rückgabe gegenüber der Erfüllung einer angegebenen Batchgröße und wird daher zurückgegeben, sobald mindestens eine Nachricht empfangen wird und unabhängig von der angegebenen Batchgröße eine Lücke bei eingehenden Nachrichten besteht. |
renew_message_lock |
Erneuern Sie die Nachrichtensperre. Dadurch wird die Sperre für die Nachricht beibehalten, um sicherzustellen, dass sie nicht an die neu zu verarbeitende Warteschlange zurückgegeben wird. Um die Nachricht abzuschließen (oder anderweitig zu begleichen), muss die Sperre beibehalten werden und kann nicht bereits abgelaufen sein. eine abgelaufene Sperre kann nicht verlängert werden. Nachrichten, die über RECEIVE_AND_DELETE-Modus empfangen werden, sind nicht gesperrt und können daher nicht verlängert werden. Dieser Vorgang ist auch nur für nicht sitzungsbehaftete Nachrichten verfügbar. |
abandon_message
Verlassen Sie die Nachricht.
Diese Nachricht wird an die Warteschlange zurückgegeben und für den Erneuten Empfang zur Verfügung gestellt.
async abandon_message(message: ServiceBusReceivedMessage) -> None
Parameter
- message
- ServiceBusReceivedMessage
Die empfangene Nachricht, die abgebrochen werden soll.
Rückgabetyp
Ausnahmen
Beispiele
Geben Sie eine empfangene Nachricht ab.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
Ausnahmen
complete_message
Vervollständigen Sie die Nachricht.
Dadurch wird die Nachricht aus der Warteschlange entfernt.
async complete_message(message: ServiceBusReceivedMessage) -> None
Parameter
- message
- ServiceBusReceivedMessage
Die empfangene Nachricht, die abgeschlossen werden soll.
Rückgabetyp
Ausnahmen
Beispiele
Vervollständigen Sie eine empfangene Nachricht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Verschieben Sie die Nachricht in die Warteschlange für unzustellbare Nachrichten.
Die Warteschlange für unzustellbare Nachrichten ist eine Untergeordnete Warteschlange, die zum Speichern von Nachrichten verwendet werden kann, die nicht ordnungsgemäß verarbeitet werden konnten oder anderweitig eine weitere Überprüfung oder Verarbeitung erfordern. Die Warteschlange kann auch so konfiguriert werden, dass abgelaufene Nachrichten an die Warteschlange für unzustellbare Nachrichten gesendet werden.
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
Parameter
Die detaillierte Fehlerbeschreibung für unzustellbare Nachrichten.
Rückgabetyp
Ausnahmen
Beispiele
Unzustellbarer Brief einer empfangenen Nachricht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Verschiebt die Nachricht.
Diese Nachricht verbleibt in der Warteschlange, muss jedoch speziell von ihrer Sequenznummer angefordert werden, um empfangen zu werden.
async defer_message(message: ServiceBusReceivedMessage) -> None
Parameter
Rückgabetyp
Ausnahmen
Beispiele
Zurückstellen einer empfangenen Nachricht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Durchsuchen von Nachrichten, die derzeit in der Warteschlange ausstehen.
Eingesehene Nachrichten werden weder aus der Warteschlange entfernt noch gesperrt. Sie können nicht abgeschlossen, verzögert oder unzustellbar sein.
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameter
- max_message_count
- int
Die maximale Anzahl von Nachrichten, die versucht und eingesehen werden sollen. Der Standardwert ist 1.
- sequence_number
- int
Eine Nachrichtensequenznummer, von der aus das Durchsuchen von Nachrichten gestartet werden soll.
Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.
Gibt zurück
Eine Liste der ~azure.servicebus.ServiceBusReceivedMessage-Objekte.
Rückgabetyp
Ausnahmen
Beispiele
Anzeigen von Nachrichten in der Warteschlange
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Empfangen von Nachrichten, die zuvor verzögert wurden.
Wenn Sie verzögerte Nachrichten von einer partitionierten Entität empfangen, müssen alle angegebenen Sequenznummern Nachrichten aus derselben Partition sein.
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameter
Eine Liste der Sequenznummern von Nachrichten, die verzögert wurden.
Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.
Gibt zurück
Eine Liste der empfangenen Nachrichten.
Rückgabetyp
Ausnahmen
Beispiele
Empfangen von verzögerten Nachrichten von ServiceBus.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Empfangen sie einen Batch von Nachrichten gleichzeitig.
Dieser Ansatz ist optimal, wenn Sie mehrere Nachrichten gleichzeitig verarbeiten oder einen Ad-hoc-Empfang als einzelnen Anruf ausführen möchten.
Beachten Sie, dass die Anzahl der in einem einzelnen Batch abgerufenen Nachrichten davon abhängt, ob prefetch_count für den Empfänger festgelegt wurde. Wenn prefetch_count nicht für den Empfänger festgelegt ist, versucht der Empfänger, max_message_count Nachrichten (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern.
Dieser Aufruf priorisiert die schnelle Rückgabe gegenüber der Erfüllung einer angegebenen Batchgröße und wird daher zurückgegeben, sobald mindestens eine Nachricht empfangen wird und unabhängig von der angegebenen Batchgröße eine Lücke bei eingehenden Nachrichten besteht.
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
Parameter
Maximale Anzahl von Nachrichten im Batch. Die tatsächliche zurückgegebene Zahl hängt von prefetch_count Größe und eingehender Datenstromrate ab. Die Einstellung auf Keine hängt vollständig von der Vorabrufkonfiguration ab. Der Standardwert ist 1.
Maximale Wartezeit in Sekunden, bis die erste Nachricht eintrifft. Wenn keine Nachrichten eingehen und kein Timeout angegeben wird, wird dieser Aufruf erst zurückgegeben, wenn die Verbindung geschlossen wird. Wenn angegeben und innerhalb des Timeoutzeitraums keine Nachrichten eingehen, wird eine leere Liste zurückgegeben.
Gibt zurück
Eine Liste der empfangenen Nachrichten. Wenn keine Nachrichten verfügbar sind, ist dies eine leere Liste.
Rückgabetyp
Ausnahmen
Beispiele
Empfangen von Nachrichten von ServiceBus.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Erneuern Sie die Nachrichtensperre.
Dadurch wird die Sperre für die Nachricht beibehalten, um sicherzustellen, dass sie nicht an die neu zu verarbeitende Warteschlange zurückgegeben wird.
Um die Nachricht abzuschließen (oder anderweitig zu begleichen), muss die Sperre beibehalten werden und kann nicht bereits abgelaufen sein. eine abgelaufene Sperre kann nicht verlängert werden.
Nachrichten, die über RECEIVE_AND_DELETE-Modus empfangen werden, sind nicht gesperrt und können daher nicht verlängert werden. Dieser Vorgang ist auch nur für nicht sitzungsbehaftete Nachrichten verfügbar.
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
Parameter
- message
- ServiceBusReceivedMessage
Die Nachricht, für die die Sperre verlängert werden soll.
Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.
Gibt zurück
Die utc datetime, bei der die Sperre abläuft.
Rückgabetyp
Ausnahmen
Beispiele
Erneuern Sie die Sperre für eine empfangene Nachricht.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Attribute
client_identifier
Rufen Sie den ServiceBusReceiver-Clientbezeichner ab, der dem Empfänger instance zugeordnet ist.
Rückgabetyp
session
Rufen Sie das ServiceBusSession-Objekt ab, das mit dem Empfänger verknüpft ist. Die Sitzung ist nur für sitzungsfähige Entitäten verfügbar. Sie würde Keine zurückgeben, wenn sie auf einem nicht sitzungsfähigen Empfänger aufgerufen wird.
Rückgabetyp
Beispiele
Abrufen einer Sitzung von einem Empfänger
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session