EventHubConsumerClient Classe
A classe EventHubConsumerClient define uma interface de alto nível para receber eventos do serviço Hubs de Eventos do Azure.
O principal objetivo do EventHubConsumerClient é receber eventos de todas as partições de um EventHub com balanceamento de carga e pontos de verificação.
Quando várias instâncias do EventHubConsumerClient estão em execução no mesmo hub de eventos, grupo de consumidores e localização de ponto de verificação, as partições serão distribuídas uniformemente entre elas.
Para ativar o balanceamento de carga e os pontos de verificação persistentes, checkpoint_store tem de ser definido ao criar o EventHubConsumerClient. Se não for fornecido um arquivo de pontos de verificação, o ponto de verificação será mantido internamente na memória.
Um EventHubConsumerClient também pode receber de uma partição específica quando chama o método receive() ou receive_batch() e especifica o partition_id. O balanceamento de carga não funcionará no modo de partição única. No entanto, os utilizadores ainda podem guardar pontos de verificação se a checkpoint_store estiver definida.
- Herança
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Construtor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parâmetros
- fully_qualified_namespace
- str
O nome de anfitrião completamente qualificado para o espaço de nomes dos Hubs de Eventos. O formato do espaço de nomes é: .servicebus.windows.net.
- credential
- AsyncTokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
O objeto de credencial utilizado para autenticação que implementa uma interface específica para obter tokens. Aceita objetos EventHubSharedKeyCredentialde credenciais ou gerados pela biblioteca de identidades do azure e objetos que implementam o método *get_token(auto, âmbitos ).
- logging_enable
- bool
Se pretende exportar registos de rastreio de rede para o logger. A predefinição é Falso.
- auth_timeout
- float
O tempo em segundos a aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será imposto nenhum tempo limite do cliente.
- user_agent
- str
Se for especificado, será adicionado à frente da cadeia de agente do utilizador.
- retry_total
- int
O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interno de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo while-loop. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor da partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.
- retry_backoff_factor
- float
Um fator de recuo a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender durante: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.
- retry_backoff_max
- float
O tempo máximo de folga. O valor predefinido é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".
- idle_timeout
- float
Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver mais atividade. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido a inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que é utilizada a porta 5671. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.
- http_proxy
Definições de proxy HTTP. Tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor str) e "proxy_port" (valor int).
- checkpoint_store
- Optional[CheckpointStore]
Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de ponto de verificação, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga é ativado. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é 30 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 180 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga é ativado, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver afirmado poucas partições, este cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.
O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço dos Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.
Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhum, caso em que certifi.where() será utilizado.
- uamqp_transport
- bool
Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.
- socket_timeout
- float
O tempo em segundos que o socket subjacente na ligação deve aguardar ao enviar e receber dados antes de exceder o tempo limite. O valor predefinido é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros eventHubsConnectionError estiverem a ocorrer devido ao tempo limite de escrita, poderá ter de ser transmitido um valor maior do que o predefinido. Isto destina-se a cenários de utilização avançada e, normalmente, o valor predefinido deve ser suficiente.
Exemplos
Crie uma nova instância do EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Métodos
close |
Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes. |
from_connection_string |
Crie um EventHubConsumerClient a partir de um cadeia de ligação. |
get_eventhub_properties |
Obtenha as propriedades do Hub de Eventos. As chaves no dicionário devolvido incluem:
|
get_partition_ids |
Obter IDs de partição do Hub de Eventos. |
get_partition_properties |
Obtenha as propriedades da partição especificada. As chaves no dicionário de propriedades incluem:
|
receive |
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais. |
receive_batch |
Receba eventos de partições em lotes, com balanceamento de carga e pontos de verificação opcionais. |
close
Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes.
async close() -> None
Tipo de retorno
Exemplos
Feche o cliente.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Crie um EventHubConsumerClient a partir de um cadeia de ligação.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parâmetros
- eventhub_name
- str
O caminho do Hub de Eventos específico ao qual ligar o cliente.
- logging_enable
- bool
Se pretende exportar registos de rastreio de rede para o logger. A predefinição é Falso.
- http_proxy
- dict
Definições de proxy HTTP. Tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor str) e "proxy_port" (valor int). Além disso, também podem estar presentes as seguintes chaves: "nome de utilizador", "palavra-passe".
- auth_timeout
- float
O tempo em segundos a aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será imposto nenhum tempo limite do cliente.
- user_agent
- str
Se for especificado, será adicionado à frente da cadeia de agente do utilizador.
- retry_total
- int
O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interno de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo while-loop. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor da partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.
- retry_backoff_factor
- float
Um fator de recuo a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender durante: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.
- retry_backoff_max
- float
O tempo máximo de folga. O valor predefinido é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".
- idle_timeout
- float
Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver mais atividade. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido a inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que é utilizada a porta 5671. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.
- checkpoint_store
- Optional[CheckpointStore]
Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de ponto de verificação, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga é ativado. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é 30 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 180 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga é ativado, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver afirmado poucas partições, este cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.
O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço dos Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.
Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhum, caso em que certifi.where() será utilizado.
- uamqp_transport
- bool
Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.
Tipo de retorno
Exemplos
Crie uma nova instância do EventHubConsumerClient a partir de cadeia de ligação.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Obtenha as propriedades do Hub de Eventos.
As chaves no dicionário devolvido incluem:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Devoluções
Um dicionário que contém informações sobre o Hub de Eventos.
Tipo de retorno
Exceções
get_partition_ids
Obter IDs de partição do Hub de Eventos.
async get_partition_ids() -> List[str]
Devoluções
Uma lista de IDs de partição.
Tipo de retorno
Exceções
get_partition_properties
Obtenha as propriedades da partição especificada.
As chaves no dicionário de propriedades incluem:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parâmetros
Devoluções
Um dicionário que contém propriedades de partição.
Tipo de retorno
Exceções
receive
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parâmetros
- on_event
- Callable[PartitionContext, Optional[EventData]]
A função de chamada de retorno para processar um evento recebido. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e o evento que é o evento recebido. A função de chamada de retorno deve ser definida como: on_event(partition_context, evento). Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event será chamada com Nenhum. Se este valor estiver definido como Nenhum ou 0 (a predefinição), a chamada de retorno não será chamada até que um evento seja recebido.
- partition_id
- str
Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.
- prefetch
- int
O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.
Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos.
Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.
- on_error
- Callable[[PartitionContext, Exception]]
A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.
Tipo de retorno
Exemplos
Receber eventos do EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Receba eventos de partições em lotes, com balanceamento de carga e pontos de verificação opcionais.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parâmetros
- on_event_batch
- Callable[PartitionContext, List[EventData]]
A função de chamada de retorno para processar um lote de eventos recebidos. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e event_batch, que são os eventos recebidos. A função de chamada de retorno deve ser definida como: on_event_batch(partition_context, event_batch). event_batch pode ser uma lista vazia se max_wait_time não for Nenhum nem 0 e nenhum evento for recebido após max_wait_time. Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.
- max_batch_size
- int
O número máximo de eventos num lote transmitido para chamada de retorno on_event_batch. Se o número real de eventos recebidos for maior do que max_batch_size, os eventos recebidos serão divididos em lotes e chamarão a chamada de retorno para cada lote com até max_batch_size eventos.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event_batch será chamada com uma lista vazia. Se este valor estiver definido como Nenhum ou 0 (a predefinição), a chamada de retorno não será chamada até que os eventos sejam recebidos.
- partition_id
- str
Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.
- prefetch
- int
O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.
Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos.
Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.
- on_error
- Callable[[PartitionContext, Exception]]
A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.
Tipo de retorno
Exemplos
Receber eventos em lotes do EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python