クイックスタート: Go を使用して Event Hubs との間でイベントを送受信する
Azure Event Hubs はビッグ データ ストリーミング プラットフォームであり、毎秒数百万のイベントを受け取って処理できるイベント インジェスト サービスです。 Event Hubs では、分散されたソフトウェアやデバイスから生成されるイベント、データ、またはテレメトリを処理および格納できます。 イベント ハブに送信されたデータは、任意のリアルタイム分析プロバイダーやバッチ処理/ストレージ アダプターを使用して、変換および保存できます。 Event Hubs の詳しい概要については、Event Hubs の概要と Event Hubs の機能に関するページをご覧ください。
このクイックスタートでは、イベント ハブとの間でイベントを送受信する Go アプリケーションを作成する方法について説明します。
注意
このクイックスタートは、https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs にある GitHub 上のサンプルに基づいています。 「イベントの送信」セクションは example_producing_events_test.go サンプルに基づいており、受信は example_processor_test.go サンプルに基づいています。 コードはクイックスタート用に簡略化されており、すべての詳細コメントは削除されているため、詳細と説明についてはサンプルを参照してください。
前提条件
このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。
- Go がローカルにインストールされていること。 必要に応じて、こちらの手順に従います。
- アクティブな Azure アカウントアカウントがない場合、Azure 試用版にサインアップして、最大 10 件の無料 Mobile Apps を入手できます。 Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
- Event Hubs 名前空間とイベント ハブを作成する。 Azure portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。
送信イベント
このセクションでは、イベント ハブにイベントを送信する Go アプリケーションの作成方法を説明します。
Go パッケージをインストールする
次の例に示すように、Event Hubs 用の Go パッケージを取得します。
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
イベント ハブにイベントを送信するためのコード
イベント ハブにイベントを送信するためのコードを次に示します。 このコードでの主な手順は次のとおりです。
- Event Hubs 名前空間への接続文字列とイベント ハブ名を使用して、Event Hubs のプロデューサー クライアントを作成します。
- バッチ オブジェクトを作成し、バッチにサンプル イベントを追加します。
- イベントのバッチをイベント ハブに送信します。
重要
NAMESPACE CONNECTION STRING
を Event Hubs 名前空間への接続文字列に置き換え、EVENT HUB NAME
をサンプル コード内のイベント ハブ名に置き換えます。
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
アプリケーションはまだ実行しないでください。 最初に受信側アプリを実行し、次に送信者アプリを実行する必要があります。
受信イベント
Storage アカウントとコンテナーの作成
パーティションのリースやイベント内のチェックポイントなどの状態は、Azure Storage コンテナーを使用して受信者間で共有されます。 Go SDK を使用してストレージ アカウントとコンテナーを作成できますが、「Azure ストレージ アカウントについて」の手順に従って作成することもできます。
チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。
- コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
- コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
- ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。
Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。
- 階層型名前空間
- BLOB の論理的な削除
- バージョン管理
Go パッケージ
メッセージを受信するには、次の例に示すように、Event Hubs 用の Go パッケージを取得します。
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
イベント ハブからイベントを受信するためのコード
イベント ハブからイベントを受信するためのコードを次に示します。 このコードでの主な手順は次のとおりです。
- イベント ハブでチェックポイント処理に使用される Azure Blob Storage を表すチェックポイント ストア オブジェクトを確認します。
- Event Hubs 名前空間への接続文字列とイベント ハブ名を使用して、Event Hubs のコンシューマー クライアントを作成します。
- クライアント オブジェクトとチェックポイント ストア オブジェクトを使用してイベント プロセッサを作成します。 プロセッサによってイベントが受信されて処理されます。
- イベント ハブ内のパーティションごとに、イベントを処理する関数として processEvents を使用してパーティション クライアントを作成します。
- すべてのパーティション クライアントを実行して、イベントを受信して処理します。
重要
次のプレースホルダーの値を実際の値に置き換えます。
AZURE STORAGE CONNECTION STRING
を、Azure ストレージ アカウントの接続文字列にBLOB CONTAINER NAME
を、ストレージ アカウントで作成した BLOB コンテナーの名前にNAMESPACE CONNECTION STRING
を、Event Hubs 名前空間への接続文字列にEVENT HUB NAME
を、サンプル コードのイベント ハブ名に
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
受信側と送信側アプリを実行する
まず、受信側アプリを実行します。
送信側アプリを実行します。
受信側ウィンドウに次の出力が表示されるまで 1 分間待ちます。
Processing 2 event(s) Event received with body hello Event received with body world
次の手順
https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs にある GitHub 上のサンプルを参照してください。