EventHubProducerClient Třída
Třída EventHubProducerClient definuje rozhraní vysoké úrovně pro odesílání událostí do služby Azure Event Hubs.
- Dědičnost
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
Konstruktor
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
Parametry
- fully_qualified_namespace
- str
Plně kvalifikovaný název hostitele pro obor názvů služby Event Hubs. Pravděpodobně se bude podobat souboru .servicebus.windows.net
- eventhub_name
- str
Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.
- credential
- AsyncTokenCredential nebo AzureSasCredential nebo AzureNamedKeyCredential
Objekt přihlašovacích údajů používaný k ověřování, který implementuje konkrétní rozhraní pro získávání tokenů. Přijímá EventHubSharedKeyCredentialobjekty přihlašovacích údajů nebo vygenerované knihovnou azure-identity a objekty, které implementují metodu *get_token(self, scopes).
- buffered_mode
- bool
Pokud je true, klient producenta bude shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.
Zpětné volání, které se má volat po úspěšném publikování dávky. Zpětné volání má dva parametry:
events: Seznam událostí, které byly úspěšně publikovány.
partition_id: ID oddílu, do kterého byly publikovány události v seznamu.
Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se , pokud má buffered_mode hodnotu True, zatímco volitelná, pokud má buffered_mode hodnotu False.
Zpětné volání, které se má volat, jakmile se nepodaří publikovat dávku. Vyžaduje se, pokud je hodnota v buffered_mode true, zatímco volitelná, pokud buffered_mode má hodnotu False. Funkce zpětného volání by měla být definována takto: on_error(události, partition_id, chyba), kde:
events: Seznam událostí, které se nepodařilo publikovat,
partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a
error: Výjimka související se selháním odesílání.
Pokud má buffered_mode hodnotu False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:
Pokud se během vytváření instance klienta producenta předává zpětné volání on_error ,
informace o chybě budou předány zpětnému volání on_error , které se pak bude volat.
Pokud se během vytváření instance klienta nepředá zpětné volání on_error,
pak bude ve výchozím nastavení vyvolána chyba.
Pokud má buffered_mode hodnotu True, vyžaduje se zpětné volání on_error a chyby se budou zpracovávat následujícím způsobem:
Pokud se události nepodaří zařadit do fronty v rámci daného časového limitu, dojde přímo k chybě.
Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .
- max_buffer_length
- int
Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.
Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.
- logging_enable
- bool
Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.
- auth_timeout
- float
Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.
- user_agent
- str
Pokud je zadaný, přidá se před řetězec uživatelského agenta.
- retry_total
- int
Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3.
- retry_backoff_factor
- float
Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.
- retry_backoff_max
- float
Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).
- retry_mode
- str
Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.
- idle_timeout
- float
Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.
- transport_type
- TransportType
Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.
- http_proxy
- dict
Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho mohou být k dispozici také následující klíče: 'username', 'password'.
Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.
Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().
- uamqp_transport
- bool
Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.
- socket_timeout
- float
Čas v sekundách, kdy by měl podkladový soket na připojení čekat při odesílání a přijímání dat před vypršením časového limitu. Výchozí hodnota je 0,2 pro TransportType.Amqp a 1 pro TransportType.AmqpOverWebsocket. Pokud k chybám EventHubsConnectionError dochází kvůli vypršení časového limitu zápisu, může být potřeba předat větší než výchozí hodnotu. To je pro pokročilé scénáře použití a obvykle by měla stačit výchozí hodnota.
Příklady
Vytvořte novou instanci EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metody
close |
Zavřete základní připojení a propojení AMQP klienta producenta. |
create_batch |
Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes. Max_size_in_bytes by neměla být větší než maximální povolená velikost zprávy definovaná službou. |
flush |
Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti. |
from_connection_string |
Vytvořte EventHubProducerClient z připojovací řetězec. |
get_buffered_event_count |
Počet událostí, které jsou v vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. Pro ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli TOTO ID oddílu v centru událostí skutečně existuje. |
get_eventhub_properties |
Získejte vlastnosti centra událostí. Mezi klíče ve vráceném slovníku patří:
|
get_partition_ids |
Získejte ID oddílů centra událostí. |
get_partition_properties |
Získá vlastnosti zadaného oddílu. Mezi klíče ve slovníku vlastností patří:
|
send_batch |
Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí události do místní vyrovnávací paměti a vrátí je. Producent provede automatické odesílání na pozadí. Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:
Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:
V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí. Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku. |
send_event |
Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí událost do místní vyrovnávací paměti a vrátí ji. Producent provede automatické dávkování a odeslání na pozadí. Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,
Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.
|
close
Zavřete základní připojení a propojení AMQP klienta producenta.
async close(*, flush: bool = True, **kwargs: Any) -> None
Parametry
- flush
- bool
Pouze režim vyrovnávací paměti. Pokud je nastavená hodnota True, události ve vyrovnávací paměti se odešlou okamžitě. Výchozí hodnota je True.
Pouze režim vyrovnávací paměti. Vypršení časového limitu pro zavření producenta. Výchozí hodnota je Žádná, což znamená žádný časový limit.
Návratový typ
Výjimky
Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.
Příklady
Zavřete obslužnou rutinu.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes.
Max_size_in_bytes by neměla být větší než maximální povolená velikost zprávy definovaná službou.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
Návratový typ
Výjimky
Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.
Příklady
Vytvoření objektu EventDataBatch v omezené velikosti
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti.
async flush(**kwargs: Any) -> None
Parametry
Vypršení časového limitu pro vyprázdnění událostí ve vyrovnávací paměti, výchozí hodnota je Žádná, což znamená žádný časový limit.
Návratový typ
Výjimky
Pokud se producentovi nepodaří vyprázdnit vyrovnávací paměť v rámci daného časového limitu v režimu vyrovnávací paměti.
from_connection_string
Vytvořte EventHubProducerClient z připojovací řetězec.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
Parametry
- eventhub_name
- str
Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.
- buffered_mode
- bool
Pokud je true, klient producenta bude shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.
Zpětné volání, které se má volat po úspěšném publikování dávky. Zpětné volání má dva parametry:
events: Seznam událostí, které byly úspěšně publikovány.
partition_id: ID oddílu, do kterého byly publikovány události v seznamu.
Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se, pokud má buffered_mode hodnotu True, zatímco volitelná, pokud má buffered_mode hodnotu False.
Zpětné volání, které se má volat, jakmile se nepodaří publikovat dávku. Funkce zpětného volání by měla být definována takto: on_error(události, partition_id, chyba), kde:
events: Seznam událostí, které se nepodařilo publikovat,
partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a
error: Výjimka související se selháním odesílání.
Pokud má buffered_mode hodnotu False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:
Pokud se během vytváření instance klienta producenta předává zpětné volání on_error ,
informace o chybě budou předány zpětnému volání on_error , které se pak bude volat.
Pokud se během vytváření instance klienta nepředá zpětné volání on_error,
pak bude ve výchozím nastavení vyvolána chyba.
Pokud má buffered_mode hodnotu True, vyžaduje se zpětné volání on_error a chyby se budou zpracovávat následujícím způsobem:
Pokud se události nepodaří zařadit do fronty v rámci daného časového limitu, dojde přímo k chybě.
Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .
- max_buffer_length
- int
Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.
Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.
- logging_enable
- bool
Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.
- http_proxy
- dict
Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho mohou být k dispozici také následující klíče: 'username', 'password'.
- auth_timeout
- float
Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.
- user_agent
- str
Pokud je zadaný, přidá se před řetězec uživatelského agenta.
- retry_total
- int
Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3.
- retry_backoff_factor
- float
Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.
- retry_backoff_max
- float
Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).
- retry_mode
- str
Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.
- idle_timeout
- float
Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.
- transport_type
- TransportType
Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.
Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.
Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().
- uamqp_transport
- bool
Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.
Návratový typ
Výjimky
Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.
Příklady
Z připojovací řetězec vytvořte novou instanci EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Počet událostí, které jsou v vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. Pro ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli TOTO ID oddílu v centru událostí skutečně existuje.
get_buffered_event_count(partition_id: str) -> int | None
Parametry
Návratový typ
Výjimky
Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.
get_eventhub_properties
Získejte vlastnosti centra událostí.
Mezi klíče ve vráceném slovníku patří:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Návraty
Slovník obsahující informace o centru událostí.
Návratový typ
Výjimky
get_partition_ids
Získejte ID oddílů centra událostí.
async get_partition_ids() -> List[str]
Návraty
Seznam ID oddílů.
Návratový typ
Výjimky
get_partition_properties
Získá vlastnosti zadaného oddílu.
Mezi klíče ve slovníku vlastností patří:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametry
Návraty
Dict vlastností oddílu.
Návratový typ
Výjimky
send_batch
Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí události do místní vyrovnávací paměti a vrátí je. Producent provede automatické odesílání na pozadí.
Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:
Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,
pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.
Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,
pak bude ve výchozím nastavení vyvolána chyba.
Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:
Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.
Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .
V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí.
Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Parametry
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
EventDataBatch objekt, který se má odeslat, nebo seznam EventData, který se má odeslat v dávce. Všechny události EventData nebo AmqpAnnotatedMessage v seznamu nebo EventDataBatch přijdou na stejný oddíl.
- timeout
- float
Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. V režimu vyrovnávací paměti je výchozí doba čekání Žádná.
- partition_id
- str
ID konkrétního oddílu, do který se má odeslat. Výchozí hodnota je None (Žádný). V takovém případě se služba přiřadí ke všem oddílům pomocí kruhového dotazování. Pokud zadáte partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná událost EventDataBatch má partition_id.
- partition_key
- str
S danou partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhoduje služba. Pokud zadáte partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná událost EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.
Návratový typ
Výjimky
Pokud hodnota určená parametrem časového limitu uplynou před událostí může být odeslána v režimu bez vyrovnávací paměti nebo události mohou být zařazení do vyrovnávací paměti v režimu vyrovnávací paměti.
Příklady
Asynchronně odesílá data událostí.
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí událost do místní vyrovnávací paměti a vrátí ji. Producent provede automatické dávkování a odeslání na pozadí.
Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Parametry
- event_data
- Union[EventData, AmqpAnnotatedMessage]
Objekt EventData , který se má odeslat.
- timeout
- float
Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. Ve vyrovnávacím režimu je výchozí doba čekání Žádná.
- partition_id
- str
KONKRÉTNÍ ID oddílu, do které se má odeslat. Výchozí hodnota je Žádný. V takovém případě služba přiřadí všechny oddíly pomocí kruhového dotazování. Pokud je zadána partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_id.
- partition_key
- str
S daným partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhodne služba. Pokud je zadána partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.
Návratový typ
Výjimky
Pokud hodnota určená parametrem časového limitu uplynou před odesláním události v režimu bez vyrovnávací paměti nebo události nelze zařadit do vyrovnávací paměti v režimu vyrovnávací paměti.
Atributy
total_buffered_event_count
Celkový počet událostí, které jsou aktuálně ve všech oddílech ve vyrovnávací paměti a čekají na publikování. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění.
Návratový typ
Azure SDK for Python