EventHubConsumerClient Klass

Klassen EventHubConsumerClient definierar ett gränssnitt på hög nivå för att ta emot händelser från Azure Event Hubs-tjänsten.

Huvudmålet med EventHubConsumerClient är att ta emot händelser från alla partitioner i en EventHub med belastningsutjämning och kontrollpunkter.

När flera EventHubConsumerClient-instanser körs mot samma händelsehubb, konsumentgrupp och kontrollpunktsplats fördelas partitionerna jämnt mellan dem.

För att aktivera belastningsutjämning och sparade kontrollpunkter måste checkpoint_store anges när eventHubConsumerClient skapas. Om det inte finns något kontrollpunktsarkiv bevaras kontrollpunkten internt i minnet.

En EventHubConsumerClient kan också ta emot från en specifik partition när du anropar metoden receive() eller receive_batch() och anger partition_id. Belastningsutjämning fungerar inte i enpartitionsläge. Men användarna kan fortfarande spara kontrollpunkter om checkpoint_store har angetts.

Arv
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametrar

fully_qualified_namespace
str
Obligatorisk

Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Namnområdesformatet är: .servicebus.windows.net.

eventhub_name
str
Obligatorisk

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

consumer_group
str
Obligatorisk

Ta emot händelser från händelsehubben för den här konsumentgruppen.

credential
AsyncTokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Obligatorisk

Autentiseringsobjektet som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av biblioteket azure-identity och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).

logging_enable
bool

Om du vill mata ut nätverksspårningsloggar till loggaren. Standardvärdet är Falskt.

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total vid mottagning är speciell : Mottagningsmetoden implementeras av en while-loop som anropar intern mottagningsmetod i varje iteration. I mottagningsfallet anger retry_total antalet återförsök efter fel som utlöses av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så tillhandahålls) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge försätts alltid principen för återförsök i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} - 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att vara i viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal back off-tid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standardvärdet är TransportType.Amqp i vilket fall port 5671 används. Om porten 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

http_proxy

HTTP-proxyinställningar. Det här måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde).

checkpoint_store
Optional[CheckpointStore]

En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktsarkiv behålls kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.

load_balancing_interval
float

När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.

partition_ownership_expiration_interval
float

Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, d.v.s. 180 sekunder när du använder standardvärdet load_balancing_interval på 30 sekunder.

load_balancing_strategy
str eller LoadBalancingStrategy

När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering hämtar så många oanmälda partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast kräver en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner, stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett strategi för belastningsutjämning. Girig strategi används som standard.

custom_endpoint_address
Optional[str]

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle se ut så här: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används som standard port 443.

connection_verify
Optional[str]

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standardvärdet är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som den underliggande transporten.

socket_timeout
float

Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när data skickas och tas emot innan tidsgränsen nås. Standardvärdet är 0,2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel uppstår på grund av tidsgränsen för skrivning kan ett större värde än standardvärdet behöva skickas in. Detta är för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.

Exempel

Skapa en ny instans av EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metoder

close

Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.

from_connection_string

Skapa en EventHubConsumerClient från en anslutningssträng.

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Ta emot händelser från partitioner med valfri belastningsutjämning och kontrollpunkter.

receive_batch

Ta emot händelser från partitioner i batchar, med valfri belastningsutjämning och kontrollpunkter.

close

Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.

async close() -> None

Returtyp

Exempel

Stäng klienten.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Skapa en EventHubConsumerClient från en anslutningssträng.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parametrar

conn_str
str
Obligatorisk

En händelsehubbs anslutningssträng.

consumer_group
str
Obligatorisk

Ta emot händelser från händelsehubben för den här konsumentgruppen.

eventhub_name
str

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

logging_enable
bool

Om du vill mata ut nätverksspårningsloggar till loggaren. Standardvärdet är Falskt.

http_proxy
dict

HTTP-proxyinställningar. Det här måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar finnas: "användarnamn", "lösenord".

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total vid mottagning är speciell : Mottagningsmetoden implementeras av en while-loop som anropar intern mottagningsmetod i varje iteration. I mottagningsfallet anger retry_total antalet återförsök efter fel som utlöses av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så tillhandahålls) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

checkpoint_store
Optional[CheckpointStore]

En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.

load_balancing_interval
float

När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.

partition_ownership_expiration_interval
float

Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 180 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.

load_balancing_strategy
str eller LoadBalancingStrategy

När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.

custom_endpoint_address
Optional[str]

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.

connection_verify
Optional[str]

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.

Returtyp

Exempel

Skapa en ny instans av EventHubConsumerClient från anslutningssträng.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Returer

En ordlista som innehåller information om händelsehubben.

Returtyp

Undantag

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

async get_partition_ids() -> List[str]

Returer

En lista över partitions-ID:t.

Returtyp

Undantag

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametrar

partition_id
str
Obligatorisk

Målpartitions-ID: t.

Returer

En ordlista som innehåller partitionsegenskaper.

Returtyp

Undantag

receive

Ta emot händelser från partitioner med valfri belastningsutjämning och kontrollpunkter.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametrar

on_event
Callable[PartitionContext, Optional[EventData]]
Obligatorisk

Återanropsfunktionen för hantering av en mottagen händelse. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och händelse som är den mottagna händelsen. Återanropsfunktionen ska definieras som: on_event(partition_context, händelse). Detaljerad information om partitionskontext finns i PartitionContext.

max_wait_time
float

Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event motringning med Ingen. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän en händelse tas emot.

partition_id
str

Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.

owner_level
int

Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.

prefetch
int

Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.

track_last_enqueued_event_properties
bool

Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.

starting_position
str, int, datetime eller dict[str,any]

Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser.

starting_position_inclusive
bool eller dict[str,bool]

Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.

on_error
Callable[[PartitionContext, Exception]]

Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.

on_partition_initialize
Callable[[PartitionContext]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras som: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.

Returtyp

Exempel

Ta emot händelser från EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Ta emot händelser från partitioner i batchar, med valfri belastningsutjämning och kontrollpunkter.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametrar

on_event_batch
Callable[PartitionContext, List[EventData]]
Obligatorisk

Återanropsfunktionen för hantering av en batch med mottagna händelser. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och event_batch, vilket är de mottagna händelserna. Återanropsfunktionen ska definieras som: on_event_batch(partition_context, event_batch). event_batch kan vara en tom lista om max_wait_time inte är Ingen eller 0 och ingen händelse tas emot efter max_wait_time. Detaljerad information om partitionskontext finns i PartitionContext.

max_batch_size
int

Det maximala antalet händelser i en batch som skickas till motringning on_event_batch. Om det faktiska mottagna antalet händelser är större än max_batch_size delas de mottagna händelserna upp i batchar och anropar återanropet för varje batch med upp till max_batch_size händelser.

max_wait_time
float

Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event_batch motringning med en tom lista. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän händelser tas emot.

partition_id
str

Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.

owner_level
int

Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.

prefetch
int

Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.

track_last_enqueued_event_properties
bool

Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.

starting_position
str, int, datetime eller dict[str,any]

Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser.

starting_position_inclusive
bool eller dict[str,bool]

Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.

on_error
Callable[[PartitionContext, Exception]]

Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.

on_partition_initialize
Callable[[PartitionContext]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras som: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.

Returtyp

Exempel

Ta emot händelser i batchar från EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )