Azure Event Hubs データ接続

Azure Event Hubs は、ビッグ データのストリーミング プラットフォームとなるイベント インジェスト サービスです。 Azure Data Explorer は、お客様が管理する Event Hubs からの継続的なインジェストを提供します。

Event Hubs のインジェスト パイプラインは、いくつかのステップで、Azure Data Explorer にイベントを転送します。 まず、Azure portal でイベント ハブを作成します。 次に、Azure Data Explorer でターゲット テーブルを作成し、指定されたingestion プロパティを使用してデータを取り込みます。 Event Hubs 接続では、 events ルーティングに注意する必要があります。 データは、イベント システム プロパティのマッピングに従って、選択したプロパティを使用して埋め込むことができます。 Event Hubs への接続を作成して、イベント ハブを作成しイベントを送信します。 このプロセスは、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

Azure Data Explorer でのデータ インジェストに関する一般的な情報については、「Azure Data Explorer のデータ インジェスト概要」を参照してください。

Azure Data Explorer のデータ接続認証オプション

  • マネージド ID ベースのデータ接続 (推奨): マネージド ID ベースのデータ接続を使用することは、データ ソースに接続するための最も安全な方法です。 データ ソースからデータをフェッチする機能を完全に制御できます。 マネージド ID を使用したデータ接続のセットアップには、次の手順が必要です。

    1. クラスターにマネージド ID を追加します
    2. データ ソースのマネージド ID にアクセス許可を付与します。 Azure Event Hubs からデータをフェッチするには、マネージド ID に Azure Event Hubs データ受信者のアクセス許可が必要です。
    3. ターゲット データベースで、マネージド ID ポリシー を設定します。
    4. マネージド ID 認証を使用してデータ接続を作成し、データをフェッチします。

    注意事項

    マネージド ID のアクセス許可がデータ ソースから削除されると、データ接続は機能しなくなり、そのデータ ソースからデータをフェッチできなくなります。

  • キーベースのデータ接続: データ接続に対しマネージド ID の認証が指定されていない場合、接続は既定として自動的にキーベースの認証に設定されます。 キーベースの接続では、リソース接続文字列 (Azure Event Hubs 接続文字列など) を使用してデータがフェッチされます。 Azure Data Explorer は、指定されたリソースのリソース接続文字列を取得し、それを安全に保存します。 その後、接続文字列を使用して、データ ソースからデータがフェッチされます。

    注意事項

    キーがローテーションされると、データ接続は機能しなくなり、データ ソースからデータをフェッチできなくなります。 この問題を解決するには、データ接続を更新または再作成します。

データ形式

Note

  • Event Hubs からのインジェストでは、RAW 形式はサポートされていません。
  • Azure Event Hubs スキーマ レジストリとスキーマレス Avro はサポートされていません。
  • gzip 圧縮アルゴリズムを使用してデータを圧縮できます。 Compressionインジェスト プロパティを使って動的に指定するか、静的なデータ接続設定で指定することができます。
  • バイナリ形式 (Avro、ApacheAvro、Parquet、ORC、W3CLOGFILE) では、データ圧縮はサポートされていません。
  • バイナリ形式と圧縮データでは、カスタム エンコードと埋め込みのシステム プロパティはサポートされていません。
  • バイナリ形式 (Avro、ApacheAvro、Parquet、ORC、W3CLOGFILE) とインジェスト マッピングを使用する場合、インジェスト マッピング定義内のフィールドの順序は、テーブル内の対応する列の順序と一致する必要があります。

Event Hubs のプロパティ

Azure Data Explorer では、次の Event Hubs プロパティがサポートされています。

Note

メタデータをイベントに関連付けるために使用される Event Hubs カスタム プロパティの取り込みはサポートされていません。 カスタム プロパティを取り込む必要がある場合は、イベント データの本文でそれらを送信します。 詳細については「カスタム プロパティの取り込み」を参照してください。

インジェストのプロパティ

インジェスト プロパティは、インジェスト プロセス、データのルーティング先と処理方法を指示します。 EventData.Properties を使用して、イベント インジェストのインジェスト プロパティを指定できます。 以下のプロパティを設定できます。

Note

プロパティ名は大文字と小文字が区別されます。

プロパティ 説明
データベース 大文字と小文字が区別される、ターゲット データベースの名前。 既定では、データはデータ接続に関連付けられているターゲット データベースに取り込まれます。 既定のデータベースをオーバーライドし、別のデータベースにデータを送信するには、このプロパティを使用します。 これを行うには、最初にマルチデータベース接続として接続を設定する必要があります。
テーブル 大文字と小文字が区別される、既存のターゲット テーブルの名前。 [Data Connection] ペインで設定された Table をオーバーライドします。
形式 データ形式。 [Data Connection] ペインで設定された Data format をオーバーライドします。
IngestionMappingReference 使用する既存のインジェスト マッピングの名前。 [Data Connection] ペインで設定された Column mapping をオーバーライドします。
圧縮 データ圧縮、None (既定値)、または gzip
[エンコード] データ エンコード (既定値は UTF8)。 .NET でサポートされているエンコードのいずれかを指定できます。
タグ JSON 配列文字列として書式設定された、取り込まれたデータに関連付けられるタグの一覧。 タグを使用すると、パフォーマンスに影響します。
RawHeaders イベント ソースが Kafka で、他のルーティング プロパティを読み取るには Azure Data Explorer でバイト配列の逆シリアル化を使用する必要があることを示します。 値は無視されます。

Note

カスタム取得の開始日が指定されていない限り、データ接続の作成後にエンキューされたイベントのみが取り込まれます。 いずれの場合も、ルックバック期間は実際のイベント ハブのリテンション期間を超えることはできません。

イベント ルーティング

クラスターへのデータ接続を作成するときに、取り込まれたデータの送信先のルーティングを指定できます。 既定のルーティングは、ターゲット データベースに関連付けられている接続文字列で指定されたターゲット テーブルです。 データに対する既定のルーティングは、"静的ルーティング" と呼ばれることもあります。 前の段落で説明した 1 つ以上のイベント データ プロパティを設定することで、データの代替ルーティングと処理のオプションを指定できます。

Note

Event Hubs データ接続は、Event Hub から読み取ったすべてのイベントの処理を試みます。また、何らかの理由で処理できないすべてのイベントは、インジェストエラーとして報告されます。 Azure Data Explorer のインジェストを監視する方法については こちらを参照してください。

イベント データを別のデータベースにルーティングする

代替データベースへのデータのルーティングは、既定では無効になっています。 データを別のデータベースに送信するには、最初に、マルチデータベース接続として接続を設定する必要があります。 この機能は、Azure portal Azure ポータルC# または Python 管理 SDK、または ARM テンプレートを使用して有効にすることができます。 データベース ルーティングを許可するために使用されるユーザー、グループ、サービス プリンシパル、またはマネージド ID には、少なくともクラスターに対する共同作成者ロールと書き込みアクセス許可が必要です。

別のデータベースを指定するには、データベースインジェスト プロパティを設定します。

警告

マルチデータベースのデータ接続として接続を設定せずに別のデータベースを指定すると、インジェストが失敗します。

イベント データを別のテーブルにルーティングする

各イベントに別のテーブルを指定するには、TableFormatCompression、およびインジェストのプロパティのマッピングを設定します。 EventData.Properties の指定に従って、取り込まれたデータが接続で動的にルーティングされ、このイベントの静的プロパティがオーバーライドされます。

次の例では、イベント ハブの詳細を設定し、気象メトリック データを代替データベース (MetricsDB) とテーブル (WeatherMetrics) に送信する方法を示します。 データは JSON 形式で、mapping1 はテーブル WeatherMetrics で事前定義されています。

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { TimeStamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

Event Hubs システム プロパティのマッピング

システム プロパティは、イベントがエンキューされた時点で Event Hubs サービスによって設定されるフィールドです。 Azuer Data Explorer Event Hubs データ接続では、特定のマッピングに基づいて、テーブルに取り込まれたデータに、定義済みのシステム プロパティのセットを埋め込むことができます。

Note

  • システム プロパティの埋め込みは、json および表形式 (つまり JSONMultiJSONCSVTSVPSVSCsvSOHsvTSVE) でサポートされています。
  • サポートされていない形式 (例: TXT や、ParquetAvro などの 圧縮形式) を使用してもデータは取り込まれますが、プロパティは無視されます。
  • イベント ハブ メッセージの圧縮が設定されている場合、システム プロパティの埋め込みはサポートされません。 このようなシナリオでは、該当するエラーが生成され、データは取り込まれません。
  • 表形式データの場合、システム プロパティは単一レコードのイベント メッセージでのみサポートされます。
  • json データの場合、システム プロパティは複数レコードのイベント メッセージでもサポートされます。 このような場合、システム プロパティは、イベント メッセージの最初のレコードにのみ追加されます。
  • CSV マッピングの場合、プロパティは、データ接続の作成に記載されている順序でレコードの先頭に追加されます。 将来変更される可能性があるため、これらのプロパティの順序に依存しないでください。
  • JSON マッピングの場合、システム プロパティの表のプロパティ名に従ってプロパティが追加されます。

Event Hubs サービスでは、次のシステム プロパティが公開されます。

プロパティ データ型 説明
x-opt-enqueued-time datetime イベントがエンキューされた UTC 時刻
x-opt-sequence-number long イベント ハブのパーティション ストリーム内のイベントの論理シーケンス番号
x-opt-offset string イベント ハブのパーティション ストリームからのイベントのオフセット。 このオフセット識別子は、イベント ハブ ストリームのパーティション内で一意です
x-opt-publisher string 発行元の名前 (発行元のエンドポイントにメッセージが送信された場合)
x-opt-partition-key string イベントが格納されている、対応するパーティションのパーティション キー

IoT Central イベント ハブを操作するときに、IoT Hub のシステム プロパティーをペイロードに埋め込むこともできます。 完全な一覧については、IoT Hub のシステム プロパティに関する記事を参照してください。

テーブルの [データ ソース] セクションで [イベント システムのプロパティ] を選択した場合は、テーブルのスキーマとマッピングにプロパティを含める必要があります。

スキーマ マッピングの例

テーブル スキーマ マッピングの例

データに 3 つの列 (TimeStampMetricName、および Value) が含まれており、含めるプロパティが x-opt-enqueued-time および x-opt-offset の場合は、次のコマンドを使用してテーブル スキーマを作成または変更します。

    .create-merge table TestTable (TimeStamp: datetime, MetricName: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV マッピングの例

次のコマンドを実行して、レコードの先頭にデータを追加します。 序数値に注意してください。

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "TimeStamp", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "MetricName", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON マッピングの例

データは、システム プロパティのマッピングを使用して追加されます。 これらのコマンドを実行します。

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "TimeStamp", "Properties":{"Path":"$.TimeStamp"}},'
    '    { "column" : "MetricName", "Properties":{"Path":"$.MetricName"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.Value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Event Hubs Capture Avro ファイルのスキーマ マッピング

Event Hubs データを使う方法の 1 つは、Azure Blob Storage または Azure Data Lake Storage の Azure Event Hubs を通じてイベントをキャプチャすることです。 その後、Azure Data Explorer の Event Grid データ接続を使って、書き込まれたキャプチャ ファイルを取り込むことができます。

キャプチャしたファイルのスキーマは、Event Hubs に送信された元のイベントのスキーマとは異なります。 この違いを考慮した上で、宛先テーブルのスキーマを設計する必要があります。 具体的には、イベントのペイロードはキャプチャ ファイルではバイト配列で表され、この配列は Event Grid の Azure Data Explorer データ接続では自動的にデコードされません。 Event Hubs Avro キャプチャ データのファイル スキーマの詳細については、「 キャプチャされた Avro ファイルを Azure Event Hubs で展開する」を参照してください。

イベント ペイロードを正しくデコードするには:

  1. キャプチャしたイベントの Body フィールドを、宛先テーブルの型 dynamic の列にマップします。
  2. unicode_codepoints_to_string() 関数を使ってバイト配列を読み取り可能な文字列に変換する更新ポリシーを適用します。

カスタム プロパティの取り込み

Event Hubs からイベントを取り込む場合、データはイベント データ オブジェクトの body セクションから取得されます。 ただし、Event Hubs の カスタム プロパティは、オブジェクトの properties セクションで定義され、取り込まれません。 顧客のプロパティを取り込むには、オブジェクトのbody セクションのデータに埋め込む必要があります。

次の例では、Event Hubs (左側) によって "定義された" カスタムプロパティ customProperty を含むイベント データ オブジェクトと、インジェスト (右側) に必要な "埋め込み" プロパティを比較しています。

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

次のいずれかの方法を使用して、イベント データ オブジェクトの body セクション内のデータにカスタム プロパティを埋め込むことができます。

  • Event Hubs では、イベント データ オブジェクトを作成するときに、オブジェクトの body セクションのデータの一部としてカスタム プロパティを埋め込みます。
  • Azure Stream Analytics を使用して、イベント ハブからのイベントを処理し、カスタム プロパティをイベント データに埋め込みます。 Azure Stream Analytics から、Azure Data Explorer 出力コネクタを使用してデータをネイティブに取り込んだり、データを別のイベント ハブにルーティングして、そこから自分のクラスターにルーティングしたりすることができます。
  • Azure Functions を使用して、カスタム プロパティを追加し、データを取り込みます。

リージョン間 Event Hubs データ接続

最適なパフォーマンスを得るには、クラスターと同じリージョンに次のすべてのリソースを作成します。 他の方法がない場合は、Premium または Dedicated Event Hubs レベル使用することを検討してください。 Event Hubs レベルの比較については、こちらを参照してください。

イベント ハブの作成

まだ用意していない場合は、イベント ハブを作成します。 イベント ハブへの接続は、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

Note

  • イベント ハブの作成後にパーティションを動的に追加する機能は、Event Hubs の Premium と Dedicated レベルでのみ使用できます。 パーティション数を選択する場合は、長期間にわたるスケールを考慮してください。
  • コンシューマー グループは、コンシューマーごとに一意である "必要があります"。 Azure Data Explorer 接続専用のコンシューマー グループを作成します。

送信イベント

データを生成してイベント ハブに送信するサンプル アプリをご覧ください。

geo ディザスター リカバリー ソリューションの設定

イベント ハブには、geo ディザスター リカバリー ソリューションが用意されています。 Azure Data Explorer では Alias イベント ハブ名前空間がサポートされていません。 ソリューションに geo ディザスター リカバリーを実装するには、2 つのイベント ハブ データ接続を作成します。1 つはプライマリ名前空間用で、もう 1 つはセカンダリ名前空間用です。 Azure Data Explorer は、両方のイベント ハブ接続をリッスンします。

Note

プライマリ名前空間からセカンダリ名前空間へのフェールオーバーを実装するのは、ユーザーの責任です。