EventHubConsumerClient クラス
EventHubConsumerClient クラスは、Azure Event Hubs サービスからイベントを受信するための高レベルのインターフェイスを定義します。
EventHubConsumerClient のメイン目標は、負荷分散とチェックポイント処理を使用して EventHub のすべてのパーティションからイベントを受信することです。
同じイベント ハブ、コンシューマー グループ、チェックポイントの場所に対して複数の EventHubConsumerClient インスタンスが実行されている場合、パーティションはそれらの間で均等に分散されます。
負荷分散と永続化されたチェックポイントを有効にするには、 EventHubConsumerClient の作成時にcheckpoint_storeを設定する必要があります。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持されます。
EventHubConsumerClient は、メソッド receive() または receive_batch() を呼び出してpartition_idを指定するときに、特定のパーティションから受信することもできます。 負荷分散は、単一パーティション モードでは機能しません。 ただし、checkpoint_storeが設定されている場合でも、ユーザーはチェックポイントを保存できます。
- 継承
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
コンストラクター
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
パラメーター
- credential
- TokenCredential または AzureSasCredential または AzureNamedKeyCredential
トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- auth_timeout
- float
サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。
- user_agent
- str
指定した場合、これはユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信 におけるretry_total のコンテキストは特別です。receive メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループ内の内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされた場合は、エラー情報と共に on_error コールバックが呼び出されます (指定されている場合)。 失敗した内部パーティション コンシューマーは閉じられ (on_partition_close が指定されている場合は呼び出されます)、新しい内部パーティション コンシューマーが作成され (on_partition_initialize 指定 されている場合は呼び出されます)、受信を再開します。
- retry_backoff_factor
- float
2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。
- retry_backoff_max
- float
最大バックオフ時間。 既定値は 120 秒 (2 分) です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。
- idle_timeout
- float
タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。
- transport_type
- TransportType
Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- checkpoint_store
- CheckpointStore または None
イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内に内部的に保持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。
- load_balancing_interval
- float
負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。
- partition_ownership_expiration_interval
- float
パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり 30 秒の既定のload_balancing_intervalを使用する場合は 180 秒です。
- load_balancing_strategy
- str または LoadBalancingStrategy
負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を使用して、負荷分散の評価ごとに、負荷のバランスを取るために必要な解放されていないパーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED は、負荷分散の評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略に使用します。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。
Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。
- socket_timeout
- float
接続の基になるソケットが、データの送受信時にタイムアウトするまで待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値よりも大きい値を渡す必要がある場合があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。
例
EventHubConsumerClient の新しいインスタンスを作成します。
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
メソッド
close |
イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。 |
from_connection_string |
接続文字列から EventHubConsumerClient を作成します。 |
get_eventhub_properties |
イベント ハブのプロパティを取得します。 返されるディクショナリのキーは次のとおりです。
|
get_partition_ids |
イベント ハブのパーティション ID を取得します。 |
get_partition_properties |
指定したパーティションのプロパティを取得します。 プロパティ ディクショナリのキーは次のとおりです。
|
receive |
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。 |
receive_batch |
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。 |
close
イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。
close() -> None
の戻り値の型 :
例
クライアントを閉じます。
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
接続文字列から EventHubConsumerClient を作成します。
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
パラメーター
- eventhub_name
- str
クライアントを接続する特定のイベント ハブのパス。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- auth_timeout
- float
サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。
- user_agent
- str
指定した場合、これはユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信 におけるretry_total のコンテキストは特別です。receive メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループ内の内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされた場合は、エラー情報と共に on_error コールバックが呼び出されます (指定されている場合)。 失敗した内部パーティション コンシューマーは閉じられ (on_partition_close が指定されている場合は呼び出されます)、新しい内部パーティション コンシューマーが作成され (on_partition_initialize 指定 されている場合は呼び出されます)、受信を再開します。
- retry_backoff_factor
- float
2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s, 0.2s, 0.4s, ...] の再試行がスリープ状態になります。 既定値は 0.8 です。
- retry_backoff_max
- float
最大バックオフ時間。 既定値は 120 秒 (2 分) です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。
- idle_timeout
- float
タイムアウト (秒単位)。その後、このクライアントは、ファーサー アクティビティがない場合に基になる接続を閉じます。 既定では、値は None です。つまり、サービスによって開始されない限り、非アクティブのためクライアントはシャットダウンされません。
- transport_type
- TransportType
Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
- http_proxy
- dict
HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- checkpoint_store
- CheckpointStore または None
イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。
- load_balancing_interval
- float
負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 10 秒です。
- partition_ownership_expiration_interval
- float
パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり既定のload_balancing_interval 30 秒を使用する場合は 60 秒です。
- load_balancing_strategy
- str または LoadBalancingStrategy
負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を greedy 戦略に使用します。これは、負荷分散評価ごとに、負荷のバランスを取るために必要な数の未請求パーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED を使用して、負荷分散評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略を行います。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。
Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。
の戻り値の型 :
例
接続文字列から EventHubConsumerClient の新しいインスタンスを作成します。
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
イベント ハブのプロパティを取得します。
返されるディクショナリのキーは次のとおりです。
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
戻り値
イベント ハブに関する情報を含むディクショナリ。
の戻り値の型 :
例外
get_partition_ids
イベント ハブのパーティション ID を取得します。
get_partition_ids() -> List[str]
戻り値
パーティション ID の一覧。
の戻り値の型 :
例外
get_partition_properties
指定したパーティションのプロパティを取得します。
プロパティ ディクショナリのキーは次のとおりです。
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
パラメーター
戻り値
パーティション プロパティを含むディクショナリ。
の戻り値の型 :
例外
receive
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
パラメーター
- on_event
- callable[PartitionContext, EventData または None]
受信したイベントを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベントである イベント の 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event(partition_context, event)。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。
- max_wait_time
- float
イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合、 on_event コールバックは None で呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントを受信するまでコールバックは呼び出されません。
- partition_id
- str
指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。
- owner_level
- int
排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先順位を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。
- prefetch
- int
処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。
- track_last_enqueued_event_properties
- bool
コンシューマーが関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡するかどうかを示します。 パーティションの last-enqueued イベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、Event Hub クライアントを使用してパーティション プロパティの要求を定期的に行うことを考慮すると、通常は有利なトレードオフとなる、追加のネットワーク帯域幅消費が少なくなります。 既定では False に設定されています。
パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データが使用可能な場合は使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値として位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型は、str、int、または datetime.datetime です。 また、ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。 既定値は "@latest" です。
指定したstarting_positionが包括的 (=) であるかどうかを判断します (>>)。 包括の場合は True、排他的の場合は False。 これは、キーとしてパーティション ID を持つディクテーション、および特定のパーティションのstarting_positionが包括的かどうかを示す値として bool にすることができます。 これは、すべてのstarting_positionの 1 つのブール値にすることもできます。 既定値は False です。
- on_error
- callable[[PartitionContext, Exception]]
再試行が使い果たされた後、または負荷分散の処理中に受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー の 2 つのパラメーターを受け取ります。 負荷分散 の処理中にエラーが発生した場合、partition_contextは None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_event コールバック中に未処理の例外が発生した場合は、on_error コールバックも呼び出されます。
- on_partition_initialize
- callable[[PartitionContext]]
特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、障害が発生し、閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_initialize(partition_context)。
- on_partition_close
- callable[[PartitionContext, CloseReason]]
特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。
の戻り値の型 :
例
EventHub からイベントを受信します。
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
パラメーター
- on_event_batch
- callable[PartitionContext, list[EventData]]
受信したイベントのバッチを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベント である event_batch という 2 つのパラメーターを受け取ります。 コールバック関数は、 on_event_batch(partition_context、event_batch) のように定義する必要があります。 max_wait_time が None でも 0 でもなく、 max_wait_time 後にイベントが受信されない場合、 event_batchは空のリストになる可能性があります。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。
- max_batch_size
- int
コールバック on_event_batchに渡されるバッチ内のイベントの最大数。 実際に受信したイベント数が max_batch_sizeを超える場合、受信したイベントはバッチに分割され、最大 max_batch_size イベントを含む各バッチのコールバックが呼び出されます。
- max_wait_time
- float
イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合は、空のリストを 使用してon_event_batch コールバックが呼び出されます。
- partition_id
- str
指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。
- owner_level
- int
排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先度を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。
- prefetch
- int
処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。
- track_last_enqueued_event_properties
- bool
コンシューマーが、関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうかを示します。 パーティションの最後にエンキューされたイベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、イベント ハブ クライアントを使用してパーティション プロパティの要求を定期的に行う場合と見なされる場合、通常は有利なトレードオフとなるネットワーク帯域幅の使用量が少なくなります。 既定では False に設定されています。
パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データは、使用可能な場合に使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値としての位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型には、str、int、datetime.datetime を指定できます。 ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。 既定値は "@latest" です。
指定したstarting_positionが inclusive(=) であるかどうかを判断します (>>)。 インクルーシブの場合は True、排他の場合は False。 これは、パーティション ID をキーとして、値として bool を指定して、特定のパーティションのstarting_positionが包括的かどうかを示す dict にすることができます。 これは、すべてのstarting_positionに対して 1 つのブール値にすることもできます。 既定値は False です。
- on_error
- callable[[PartitionContext, Exception]]
再試行が使い果たされた後、または負荷分散の処理中に、受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー という 2 つのパラメーターを受け取ります。 負荷分散の 処理中にエラーが発生した場合は、partition_context None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_errorコールバックは、on_eventコールバック中にハンドルされない例外が発生した場合にも呼び出されます。
- on_partition_initialize
- callable[[PartitionContext]]
特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、失敗した閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは、 on_partition_initialize(partition_context)のように定義する必要があります。
- on_partition_close
- callable[[PartitionContext, CloseReason]]
特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。
の戻り値の型 :
例
EventHub からバッチでイベントを受信します。
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python