EventHubConsumerClient Klass
Klassen EventHubConsumerClient definierar ett gränssnitt på hög nivå för att ta emot händelser från Azure Event Hubs-tjänsten.
Huvudmålet med EventHubConsumerClient är att ta emot händelser från alla partitioner i en EventHub med belastningsutjämning och kontrollpunkter.
När flera EventHubConsumerClient-instanser körs mot samma händelsehubb, konsumentgrupp och kontrollpunktsplats fördelas partitionerna jämnt mellan dem.
För att aktivera belastningsutjämning och sparade kontrollpunkter måste checkpoint_store anges när eventHubConsumerClient skapas. Om det inte finns något kontrollpunktsarkiv bevaras kontrollpunkten internt i minnet.
En EventHubConsumerClient kan också ta emot från en specifik partition när du anropar metoden receive() eller receive_batch() och anger partition_id. Belastningsutjämning fungerar inte i enpartitionsläge. Men användarna kan fortfarande spara kontrollpunkter om checkpoint_store har angetts.
- Arv
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametrar
- fully_qualified_namespace
- str
Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Namnområdesformatet är: .servicebus.windows.net.
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- credential
- AsyncTokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Autentiseringsobjektet som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av biblioteket azure-identity och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).
- logging_enable
- bool
Om du vill mata ut nätverksspårningsloggar till loggaren. Standardvärdet är Falskt.
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total vid mottagning är speciell : Mottagningsmetoden implementeras av en while-loop som anropar intern mottagningsmetod i varje iteration. I mottagningsfallet anger retry_total antalet återförsök efter fel som utlöses av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så tillhandahålls) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge försätts alltid principen för återförsök i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} - 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att vara i viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal back off-tid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standardvärdet är TransportType.Amqp i vilket fall port 5671 används. Om porten 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
- http_proxy
HTTP-proxyinställningar. Det här måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde).
- checkpoint_store
- Optional[CheckpointStore]
En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktsarkiv behålls kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.
- load_balancing_interval
- float
När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.
- partition_ownership_expiration_interval
- float
Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, d.v.s. 180 sekunder när du använder standardvärdet load_balancing_interval på 30 sekunder.
- load_balancing_strategy
- str eller LoadBalancingStrategy
När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering hämtar så många oanmälda partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast kräver en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner, stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett strategi för belastningsutjämning. Girig strategi används som standard.
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle se ut så här: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används som standard port 443.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standardvärdet är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som den underliggande transporten.
- socket_timeout
- float
Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när data skickas och tas emot innan tidsgränsen nås. Standardvärdet är 0,2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel uppstår på grund av tidsgränsen för skrivning kan ett större värde än standardvärdet behöva skickas in. Detta är för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.
Exempel
Skapa en ny instans av EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metoder
close |
Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna. |
from_connection_string |
Skapa en EventHubConsumerClient från en anslutningssträng. |
get_eventhub_properties |
Hämta egenskaper för händelsehubben. Nycklar i den returnerade ordlistan är:
|
get_partition_ids |
Hämta partitions-ID:t för händelsehubben. |
get_partition_properties |
Hämta egenskaper för den angivna partitionen. Nycklar i egenskapsordlistan är:
|
receive |
Ta emot händelser från partitioner med valfri belastningsutjämning och kontrollpunkter. |
receive_batch |
Ta emot händelser från partitioner i batchar, med valfri belastningsutjämning och kontrollpunkter. |
close
Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.
async close() -> None
Returtyp
Exempel
Stäng klienten.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Skapa en EventHubConsumerClient från en anslutningssträng.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parametrar
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- logging_enable
- bool
Om du vill mata ut nätverksspårningsloggar till loggaren. Standardvärdet är Falskt.
- http_proxy
- dict
HTTP-proxyinställningar. Det här måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar finnas: "användarnamn", "lösenord".
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total vid mottagning är speciell : Mottagningsmetoden implementeras av en while-loop som anropar intern mottagningsmetod i varje iteration. I mottagningsfallet anger retry_total antalet återförsök efter fel som utlöses av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så tillhandahålls) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
- checkpoint_store
- Optional[CheckpointStore]
En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.
- load_balancing_interval
- float
När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.
- partition_ownership_expiration_interval
- float
Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 180 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.
- load_balancing_strategy
- str eller LoadBalancingStrategy
När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.
Returtyp
Exempel
Skapa en ny instans av EventHubConsumerClient från anslutningssträng.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Hämta egenskaper för händelsehubben.
Nycklar i den returnerade ordlistan är:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Returer
En ordlista som innehåller information om händelsehubben.
Returtyp
Undantag
get_partition_ids
Hämta partitions-ID:t för händelsehubben.
async get_partition_ids() -> List[str]
Returer
En lista över partitions-ID:t.
Returtyp
Undantag
get_partition_properties
Hämta egenskaper för den angivna partitionen.
Nycklar i egenskapsordlistan är:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametrar
Returer
En ordlista som innehåller partitionsegenskaper.
Returtyp
Undantag
receive
Ta emot händelser från partitioner med valfri belastningsutjämning och kontrollpunkter.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametrar
- on_event
- Callable[PartitionContext, Optional[EventData]]
Återanropsfunktionen för hantering av en mottagen händelse. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och händelse som är den mottagna händelsen. Återanropsfunktionen ska definieras som: on_event(partition_context, händelse). Detaljerad information om partitionskontext finns i PartitionContext.
- max_wait_time
- float
Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event motringning med Ingen. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän en händelse tas emot.
- partition_id
- str
Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.
- owner_level
- int
Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.
- prefetch
- int
Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.
- track_last_enqueued_event_properties
- bool
Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.
Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser.
Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.
- on_error
- Callable[[PartitionContext, Exception]]
Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.
- on_partition_initialize
- Callable[[PartitionContext]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras som: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.
Returtyp
Exempel
Ta emot händelser från EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Ta emot händelser från partitioner i batchar, med valfri belastningsutjämning och kontrollpunkter.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametrar
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Återanropsfunktionen för hantering av en batch med mottagna händelser. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och event_batch, vilket är de mottagna händelserna. Återanropsfunktionen ska definieras som: on_event_batch(partition_context, event_batch). event_batch kan vara en tom lista om max_wait_time inte är Ingen eller 0 och ingen händelse tas emot efter max_wait_time. Detaljerad information om partitionskontext finns i PartitionContext.
- max_batch_size
- int
Det maximala antalet händelser i en batch som skickas till motringning on_event_batch. Om det faktiska mottagna antalet händelser är större än max_batch_size delas de mottagna händelserna upp i batchar och anropar återanropet för varje batch med upp till max_batch_size händelser.
- max_wait_time
- float
Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event_batch motringning med en tom lista. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän händelser tas emot.
- partition_id
- str
Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.
- owner_level
- int
Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.
- prefetch
- int
Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.
- track_last_enqueued_event_properties
- bool
Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.
Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser.
Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.
- on_error
- Callable[[PartitionContext, Exception]]
Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.
- on_partition_initialize
- Callable[[PartitionContext]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras som: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.
Returtyp
Exempel
Ta emot händelser i batchar från EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python