Python 用Azure Event Grid クライアント ライブラリ - バージョン 4.16.0

Azure Event Grid は、パブリッシュ/サブスクライブ モデルを使用した画一的なイベントの使用を可能にする、完全に管理されたインテリジェントなイベント ルーティング サービスです。

ソースコード | パッケージ (PyPI) | パッケージ (Conda) | API リファレンス ドキュメント | 製品ドキュメント | サンプル | Changelog

作業の開始

前提条件

パッケージをインストールする

pip を使用して Python 用Azure Event Grid クライアント ライブラリをインストールします。

pip install azure-eventgrid
  • 既存の Event Grid トピックまたはドメインが必要です。 リソースは、Azure Portal または Azure CLI を使用して作成できます

Azure CLI を使用する場合は、 と <resource-name> を独自の一意の名前に置き換えます<resource-group-name>

Event Grid トピックを作成する

az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>

Event Grid ドメインを作成する

az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>

クライアントを認証する

Event Grid サービスを操作するには、クライアントのインスタンスを作成する必要があります。 クライアント オブジェクトをインスタンス化するには、 エンドポイント資格情報 が必要です。

Azure Active Directory (AAD) の使用

Azure Event Gridでは、要求の ID ベースの認証のために Azure Active Directory (Azure AD) との統合が提供されます。 Azure AD では、ロールベースのアクセス制御 (RBAC) を使用して、Azure Event Grid リソースへのアクセス権をユーザー、グループ、またはアプリケーションに付与できます。

を使用 TokenCredentialしてトピックまたはドメインにイベントを送信するには、認証された ID に "EventGrid データ送信者" ロールが割り当てられている必要があります。

パッケージを azure-identity 使用すると、開発環境と運用環境の両方で要求をシームレスに承認できます。 Azure Active Directory の詳細については、README に関するページをazure-identity参照してください。

たとえば、 を使用 DefaultAzureCredential して、Azure Active Directory を使用して認証するクライアントを構築できます。

from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)

エンドポイントの参照

トピック エンドポイントは、Azure portalの Event Grid トピック リソース内にあります。 これは次のようになります。 "https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"

AzureKeyCredential を使用してクライアントを作成する

Access キーをパラメーターとして credential 使用するには、キーを文字列として AzureKeyCredential のインスタンスに渡します。

メモ: アクセス キーは、Azure portal の Event Grid トピック リソースの [アクセス キー] メニューにあります。 また、Azure CLI または azure-mgmt-eventgrid ライブラリを使用して取得することもできます。 アクセス キーを取得するためのガイド については、こちらを参照してください

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)

メモ: クライアントは、 を使用して SAS 署名を使用 AzureSasCredentialして認証することもできます。 これを示すサンプルについては、 こちらを 参照してください (async_version)。

メモ: メソッドを generate_sas 使用して、共有アクセス署名を生成できます。 これを示すサンプルについては、こちらを参照 してください

主要な概念

トピック

トピックは、イベントを送信する EventGrid サービス内のチャネルです。 トピックが受け入れるイベント スキーマは、トピック作成時に決定されます。 スキーマの種類のイベントが、別のスキーマの種類を必要とするトピックに送信されると、エラーが発生します。

Domain

イベント ドメイン は、同じアプリケーションに関連する多数の Event Grid トピックの管理ツールです。 数千のトピックにイベントを発行できます。 ドメインでは、各トピックに対する承認と認証の制御も提供されます。 詳細については、「 イベント ドメインの概要」を参照してください。

イベント ドメインを作成すると、このドメインの発行エンドポイントが使用できるようになります。 このプロセスは、Event Grid トピックの作成に似ています。 唯一の違いは、ドメインに発行する場合は、イベントを配信するドメイン内のトピックを指定する必要があるということです。

イベント スキーマ

イベントは、システムで発生した何かを完全に説明する最小量の情報です。 カスタム トピックまたはドメインを作成するときは、イベントの発行時に使用するスキーマを指定する必要があります。

Event Grid では、イベントをエンコードするための複数のスキーマがサポートされています。

Event Grid スキーマ

カスタム スキーマを使用するようにトピックを構成することもできますが、既に定義されている Event Grid スキーマを使用する方が一般的です。 仕様と要件については、 こちらを参照してください

CloudEvents v1.0 スキーマ

もう 1 つのオプションは、CloudEvents v1.0 スキーマを使用することです。 CloudEvents は、一般的な方法でイベント データを記述するための仕様を生成する Cloud Native Computing Foundation プロジェクトです。 CloudEvents のサービスの概要については、 こちらを参照してください

EventGridPublisherClient

EventGridPublisherClient は、クライアントの初期化中に指定されたトピック ホスト名にイベント データを送信する操作を提供します。

トピックまたはドメインが使用するように構成されているスキーマに関係なく、 EventGridPublisherClient イベントを発行するために使用されます。 メソッドの send 発行イベントを使用します。

次の形式のイベントを送信できます。

  • 厳密に型指定された EventGridEvents のリストまたは単一インスタンス。

  • シリアル化された EventGridEvent オブジェクトの dict 表現。

  • 厳密に型指定された CloudEvents のリストまたは単一インスタンス。

  • シリアル化された CloudEvent オブジェクトのディクテーション表現。

  • 任意のカスタム スキーマのディクテーション表現。

詳細な例については、 サンプル を参照してください。

メモ: 発行する前に、トピックで CloudEvents または EventGridEvents がサポートされているかどうかを確認することが重要です。 送信するイベントのスキーマをサポートしていないトピックに送信すると、send() は例外をスローします。

システム トピック

Event Grid のシステム トピックは、Azure Storage や Azure Event Hubs などの Azure サービスによって発行された 1 つ以上のイベントを表します。 たとえば、システム トピックは、すべての BLOB イベントを表すか、特定のストレージ アカウントに対して発行された BLOB の作成イベントと BLOB 削除イベントのみを表します。

Azure Event Gridに発行されたシステム イベントのさまざまなイベントの種類の名前は、 でazure.eventgrid.SystemEventNames使用できます。 認識可能なシステム トピックの完全な一覧については、「 システム トピック」を参照してください。

Event Grid の主要な概念の詳細については、「Azure Event Gridの概念」を参照してください。

Azure Arc を使用した Kubernetes 上の Event Grid

Azure Arc を使用した Kubernetes 上の Event Grid は、独自の Kubernetes クラスターで Event Grid を実行できるオファリングです。 この機能は、Azure Arc 対応 Kubernetes を使用すると有効になります。 Azure Arc 対応 Kubernetes を使用すると、サポートされている Kubernetes クラスターが Azure に接続されます。 接続したら、そこに Event Grid をインストールすることができます。 詳細については、こちら を参照してください。

CNCF クラウド イベントのサポート

v4.7.0 以降、このパッケージでは、 からの https://pypi.org/project/cloudevents/CNCF クラウド イベントの発行もサポートされています。 このライブラリから API に CloudEvent オブジェクトを send 渡すことができます。


from cloudevents.http import CloudEvent

event = CloudEvent(...)

client.send(event)

次のセクションでは、次のような最も一般的な Event Grid タスクをカバーするいくつかのコード スニペットを示します。

Event Grid イベントを送信する

この例では、Event Grid イベントを発行します。

import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]

event = EventGridEvent(
    data={"team": "azure-sdk"},
    subject="Door1",
    event_type="Azure.Sdk.Demo",
    data_version="2.0"
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

クラウド イベントを送信する

この例では、Cloud イベントを発行します。

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

複数のイベントを送信する

トピックまたはドメインに複数のイベントを送信するときに、イベントをバッチとして送信できます。 この例では、send メソッドを使用して CloudEvents の一覧を送信します。

警告: 一度に複数のイベントのリストを送信する場合、各イベントを反復処理して送信しても、最適なパフォーマンスは得られない。 最適なパフォーマンスを得るには、イベントの一覧を送信することを強くお勧めします。

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event0 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)
event1 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team2": "azure-eventgrid"}
)

events = [event0, event1]

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(events)

ディクショナリとしてイベントを送信する

シリアル化された各モデルのディクテーション表現を使用して、厳密に型指定されたオブジェクトとは別に CloudEvent または EventGridEvent を発行することもできます。

以下に示すように、dict のような表現を使用して、カスタム スキーマを使用してトピックに送信します。

import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]

event = custom_schema_event = {
    "customSubject": "sample",
    "customEventType": "sample.event",
    "customDataVersion": "2.0",
    "customId": uuid.uuid4(),
    "customEventTime": dt.datetime.now(UTC()).isoformat(),
    "customData": "sample data"
    }

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

ストレージ キューから使用する

この例では、ストレージ キューから受信したメッセージを使用し、CloudEvent オブジェクトに逆シリアル化します。

from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json

# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']

with QueueServiceClient.from_connection_string(connection_str) as qsc:
    payload =  qsc.get_queue_client(
        queue=queue_name,
        message_decode_policy=BinaryBase64DecodePolicy()
        ).peek_messages()

    ## deserialize payload into a list of typed Events
    events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]

サービスバスから使用する

この例では、ServiceBus から受信したペイロード メッセージを使用し、EventGridEvent オブジェクトに逆シリアル化します。

from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json

# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connection_str) as sb_client:
    payload =  sb_client.get_queue_receiver(queue_name).receive_messages()

    ## deserialize payload into a list of typed Events
    events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]

EventGrid を使用した分散トレース

OpenTelemetry for Python は、Azure-core トレース統合と互換性があるため、EventGrid で通常どおり使用できます。

OpenTelemetry を使用して CloudEvent の送信をトレースする例を次に示します。

まず、EventGrid の有効なトレース プラグインとして OpenTelemetry を設定します。

from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan

settings.tracing_implementation = OpenTelemetrySpan

テレメトリの定期的な使用状況については、こちらを参照してください。 詳細については、「 OpenTelemetry 」を参照してください。 この例では、単純なコンソール エクスポーターを使用してトレースをエクスポートします。 任意の輸出者は、などを含むazure-monitor-opentelemetry-exporterjaegerzipkinここで使用することができます。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor  # this requires opentelemetry >= 1.0.0

# Simple console exporter
exporter = ConsoleSpanExporter()

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(exporter)
)

exportertracer設定されたら、次の例に従って、 の EventGridPublisherClient メソッドを使用して send CloudEvent オブジェクトを送信しながらトレースの収集を開始してください。

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential

hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
    source = 'demo',
    type = 'sdk.demo',
    data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
    client = EventGridPublisherClient(hostname, key)
    client.send(cloud_event)

トラブルシューティング

  • ロガーを有効 azure.eventgrid にして、ライブラリからトレースを収集します。

全般

Event Grid クライアント ライブラリでは、 Azure Core で定義されている例外が発生します。

ログの記録

このライブラリでは、標準の ログ記録 ライブラリを使用してログを記録します。 HTTP セッション (URL、ヘッダーなど) に関する基本情報は INFO レベルでログに記録されます。

オプションの構成

省略可能なキーワード (keyword)引数は、クライアントレベルと操作ごとのレベルで渡すことができます。 azure-core リファレンス ドキュメント では、再試行、ログ記録、トランスポート プロトコルなどの使用可能な構成について説明しています。

次の手順

次のセクションでは、Event Grid Python API で使用される一般的なパターンを示すコード スニペットをいくつか示します。

その他のサンプル コード

これらのコード サンプルは、Azure Event Grid クライアント ライブラリを使用した一般的なチャンピオン シナリオ操作を示しています。

次のサンプルでは、EventGridEvents と CloudEvents の公開と使用 dict の表現について説明します。

その他のサンプル については、こちらを参照してください

  • 送信シナリオに関連するその他のサンプル については、こちらを参照してください
  • さまざまなメッセージング サービスのペイロードを型指定されたオブジェクトとして使用することに関連するその他のサンプルについては、「サンプルを使用する」を参照してください。

その他のドキュメント

Azure Event Gridに関するより広範なドキュメントについては、docs.microsoft.com に関する Event Grid のドキュメントを参照してください

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、「 cla.microsoft.com」を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。