Event Hubs と .NET を使用して Atlas Kafka トピック メッセージを送受信する

このクイック スタートでは、 Atlas Kafka トピック イベントを送受信する方法について説明します。 Azure Event HubsAzure.Messaging.EventHubs .NET ライブラリを使用します。

前提条件

Event Hubs を初めて使用する場合は、このクイック スタートを完了する前に 、「Event Hubs の概要 」を参照してください。

このクイック スタートに従うには、特定の前提条件が必要です。

  • Microsoft Azure サブスクリプション。 Event Hubs を含む Azure サービスを使用するには、Azure サブスクリプションが必要です。 Azure アカウントをお持ちでない場合は、 無料試用版 にサインアップするか、 アカウントの作成時に MSDN サブスクライバー特典を使用できます。
  • Microsoft Visual Studio 2022。 Event Hubs クライアント ライブラリは、C# 8.0 で導入された新機能を利用します。 以前の C# バージョンでもライブラリを使用できますが、新しい構文は使用できません。 完全な構文を使用するには、 .NET Core SDK 3.0 以降と 言語バージョン を に latest設定してコンパイルすることをお勧めします。 Visual Studio 2019 より前のバージョンの Visual Studio を使用している場合、C# 8.0 プロジェクトのビルドに必要なツールはありません。 無料の Community エディションを含む Visual Studio 2022 は、 こちらからダウンロードできます。
  • アクティブな Microsoft Purview アカウント
  • メッセージを送受信するように Microsoft Purview アカウントで構成された Event Hubs:
    • アカウントは既に構成されている可能性があります。 Microsoft Purview アカウントは、[設定] の [Kafka 構成] のAzure portalでチェックできます。 まだ構成されていない場合は、 このガイドに従ってください

Microsoft Purview にメッセージを発行する

ATLAS_HOOK、Event Hubs Kafka トピックを使用して Microsoft Purview にイベントを送信する .NET Core コンソール アプリケーション 作成しましょう。

Microsoft Purview にメッセージを発行するには、 マネージド Event Hubs か、 フック構成を持つ少なくとも 1 つの Event Hubs が必要です。

Visual Studio プロジェクトを作成する

次に、Visual Studio で C# .NET コンソール アプリケーションを作成します。

  1. Visual Studio を起動します
  2. [スタート] ウィンドウで、[新しいプロジェクト>の作成] [コンソール アプリ (.NET Framework)] を選択します。 .NET バージョン 4.5.2 以降が必要です。
  3. [ プロジェクト名] に「 PurviewKafkaProducer」と入力します。
  4. [ 作成] を 選択してプロジェクトを作成します。

コンソール アプリケーションを作成する

  1. Visual Studio 2022 を起動します。
  2. [ 新しいプロジェクトの作成] を選択します
  3. [ 新しいプロジェクトの作成 ] ダイアログ ボックスで、次の手順を実行します。このダイアログ ボックスが表示されない場合は、メニューの [ ファイル ] を選択し、[ 新規作成] を選択し、[ プロジェクト] を選択します。
    1. プログラミング言語として [C# ] を選択します。
    2. アプリケーションの種類として [ コンソール ] を選択します。
    3. 結果の一覧から [ コンソール アプリ (.NET Core)] を選択します。
    4. さらに [次へ] を選択します。

Event Hubs NuGet パッケージを追加する

  1. メニューから [ツール>] [NuGet パッケージ マネージャーパッケージ マネージャー> コンソール] を選択します。

  2. 次のコマンドを実行して、 Azure.Messaging.EventHubs NuGet パッケージと Azure.Messaging.EventHubs.Producer NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

イベント ハブにメッセージを送信するコードを記述する

  1. Program.cs ファイルの先頭に次usingのステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Event Hubs Program 接続文字列と Event Hubs 名のクラスに定数を追加します。

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. メソッドを Mainasync Main のメソッドに置き換え、 を async ProduceMessage 追加して Microsoft Purview にメッセージをプッシュします。 詳細については、コードのコメントを参照してください。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. プロジェクトをビルドします。 エラーがないことを確認します。

  5. プログラムを実行し、確認メッセージを待ちます。

    注:

    詳細なコメントを含む完全なソース コードについては、GitHub のこのファイルを参照してください

Create Entity JSON メッセージを使用して 2 つの列を持つ SQL テーブルを作成するサンプル コード

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Microsoft Purview メッセージを受信する

次に、イベント プロセッサを使用してイベント ハブからメッセージを受信する .NET Core コンソール アプリケーションを記述する方法について説明します。 イベント プロセッサは、イベント ハブからの永続的なチェックポイントと並列受信を管理します。 これにより、イベントを受信するプロセスが簡略化されます。 Microsoft Purview からメッセージを受信するには、ATLAS_ENTITIES イベント ハブを使用する必要があります。

Microsoft Purview からメッセージを受信するには、 マネージド Event Hubs または Event Hubs通知構成が必要です。

警告

Event Hubs SDK では、使用可能な最新バージョンの Storage API が使用されます。 そのバージョンは、必ずしも Stack Hub プラットフォームで使用できるとは限りません。 Azure Stack Hub でこのコードを実行すると、使用している特定のバージョンをターゲットにしない限り、ランタイム エラーが発生します。 チェックポイント ストアとしてAzure Blob Storageを使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コードでそのバージョンをターゲットにします。

ストレージ サービスの使用可能な最も高いバージョンは、バージョン 2019-02-02 です。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で使用可能な最高バージョン (SDK のリリース時点で 2019-07-07) が使用されます。 Azure Stack Hub バージョン 2005 を使用している場合は、このセクションの手順に従うだけでなく、Storage サービス API バージョン 2019-02-02 を対象とするコードも追加する必要があります。 特定の Storage API バージョンをターゲットにする方法については、 GitHub のこのサンプルを参照してください。

Azure Storage と BLOB コンテナーを作成する

チェックポイント ストアとして Azure Storage を使用します。 Azure Storage アカウントを作成するには、次の手順に従います。

  1. Azure Storage アカウントを作成する

  2. BLOB コンテナーを作成する

  3. ストレージ アカウントの接続文字列を取得する

    接続文字列とコンテナー名を書き留めます。 受信コードで使用します。

レシーバーの Visual Studio プロジェクトを作成する

  1. ソリューション エクスプローラー ウィンドウで、EventHubQuickStart ソリューションを選択して長押し (または右クリック) し、[追加] をポイントして、[新しいプロジェクト] を選択します。
  2. [ コンソール アプリ (.NET Core)] を選択し、[ 次へ] を選択します。
  3. プロジェクト名「PurviewKafkaConsumer」と入力し、[作成] を選択します。

Event Hubs NuGet パッケージを追加する

  1. メニューから [ツール>] [NuGet パッケージ マネージャーパッケージ マネージャー> コンソール] を選択します。

  2. 次のコマンドを実行して、 Azure.Messaging.EventHubs NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs
    
  3. 次のコマンドを実行して、 Azure.Messaging.EventHubs.Processor NuGet パッケージをインストールします。

    Install-Package Azure.Messaging.EventHubs.Processor
    

Main メソッドを更新する

  1. Program.cs ファイルの先頭に次usingのステートメントを追加します。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Event Hubs Program 接続文字列とイベント ハブ名のクラスに定数を追加します。 角かっこ内のプレースホルダーを、イベント ハブとストレージ アカウント (アクセス キー - プライマリ接続文字列) の作成時に取得した実際の値に置き換えます。 が名前空間レベルの接続文字列であり、イベント ハブ文字列ではないことを確認 {Event Hubs namespace connection string} します。

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Microsoft Purview にメッセージを 送信するときは、ATLAS_ENTITIESをイベント ハブ名として使用します。

  3. メソッドを Mainasync Main のメソッドに置き換えます。 詳細については、コードのコメントを参照してください。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. 次に、次のイベント ハンドラー メソッドとエラー ハンドラー メソッドを クラスに追加します。

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. プロジェクトをビルドします。 エラーがないことを確認します。

    注:

    詳細なコメントを含む完全なソース コードについては、 GitHub のこのファイルを参照してください。

  6. 受信側アプリケーションを実行します。

Microsoft Purview から受信したメッセージの例

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

次の手順

GitHub のその他の例を確認してください。