演習 - トピックを使用してメッセージを送受信する

完了

あなたは、Azure Service Bus トピックを使用して、セールスフォース アプリケーションで販売実績メッセージを配布することにしました。 営業担当者は、このアプリを自分のモバイル デバイスで使用して、各エリアと期間ごとの売上高をまとめたメッセージを送信します。 これらのメッセージは、南北アメリカやヨーロッパなど会社の営業地域内にある Web サービスに配信されます。

トピックに必要なインフラストラクチャは、お使いの Azure サブスクリプションに既に実装されています。 ここでは、トピックにメッセージを送信するコードと、サブスクリプションからメッセージを取得するコードを記述します。 次に、メッセージをトピックに送信し、特定のサブスクリプションのメッセージを取得します。

Azure Cloud Shell で次のコマンドを実行して、正しいディレクトリで作業していることを確認します。

cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start
code .

トピックにメッセージを送信するコードを記述する

販売業績に関するメッセージを送信するコンポーネントを完成させるには、次の手順のようにします。

  1. Azure Cloud Shell エディターで、performancemessagesender/Program.cs を開き、次のコード行を見つけます。

    const string ServiceBusConnectionString = "";
    

    引用符の間に、前の演習で保存した接続文字列を貼り付けます。

  2. キュー名に salesperformancemessages とは異なる名前を使用した場合は、コード内の TopicName プロパティの値を更新します。

    const string TopicName = "salesperformancemessages";
    
  3. SendPerformanceMessageAsync() メソッドを見つけます。 (ヒント: 26 行目またはその付近にあります)。そのメソッド内で、次のコード行を見つけます。

    // Create a Service Bus client here
    

    そのコード行を次のコードに置き換えます。

    // By leveraging "await using", the DisposeAsync method will be called automatically when the client variable goes out of scope.
    // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program.
    await using var client = new ServiceBusClient(ServiceBusConnectionString);
    
  4. SendPerformanceMessageAsync() メソッド内で、次のコード行を見つけます。

    // Create a sender here
    

    そのコード行を次のコードに置き換えます。

    await using ServiceBusSender sender = client.CreateSender(TopicName);
    
  5. try...catch ブロック内で、次のコード行を見つけます。

    // Create and send a message here
    

    そのコード行を次のコードに置き換えます。

    string messageBody = "Total sales for Brazil in August: $13m.";
    var message = new ServiceBusMessage(messageBody);
    
  6. コンソールでメッセージを表示するには、次の行に以下のコードを挿入します。

    Console.WriteLine($"Sending message: {messageBody}");
    
  7. トピックにメッセージを送信するには、次の行に以下のコードを挿入します。

    await sender.SendMessageAsync(message);
    
  8. 最後のコードが次の例のようになっていることを確認します。

    using System;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace performancemessagesender
    {
        class Program
        {
            const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx";
            const string TopicName = "salesperformancemessages";
    
            static void Main(string[] args)
            {
                Console.WriteLine("Sending a message to the Sales Performance topic...");
                SendPerformanceMessageAsync().GetAwaiter().GetResult();
                Console.WriteLine("Message was sent successfully.");
            }
    
            static async Task SendPerformanceMessageAsync()
            {
                // By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope.
                // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program.
                await using var client = new ServiceBusClient(ServiceBusConnectionString);
    
                await using ServiceBusSender sender = client.CreateSender(TopicName);
    
                try
                {
                    string messageBody = "Total sales for Brazil in August: $13m.";
                    var message = new ServiceBusMessage(messageBody);
                    Console.WriteLine($"Sending message: {messageBody}");
                    await sender.SendMessageAsync(message);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
                }
            }
        }
    }
    
  9. 変更を保存するには、Ctrl + S キーを押します。また、エディターを閉じるには、Ctrl + Q キーを押します。

メッセージをトピックに送信する

  1. 販売に関するメッセージを送信するコンポーネントを実行するには、Cloud Shell で次のコマンドを実行します。

    dotnet run --project performancemessagesender
    
  2. プログラムを実行したら、メッセージが送信されていることを示す通知を Cloud Shell で監視します。 アプリを実行するたびに、別のメッセージがトピックに追加され、各サブスクリプションでコピーが使用できるようになります。

    Sending a message to the Sales Performance topic...
    Sending message: Total sales for Brazil in August: $13m.
    Message was sent successfully.
    

サブスクリプションのメッセージを取得する前に、メッセージ数を調べる

Message was sent successfully と表示されたら、次のコマンドを実行して、Americas サブスクリプション内のメッセージの数を確認します。 <namespace-name> を Service Bus 名前空間に置き換えることを忘れないでください。

az servicebus topic subscription show \
    --resource-group <rgn>[sandbox resource group name]</rgn> \
    --topic-name salesperformancemessages \
    --name Americas \
    --query messageCount \
    --namespace-name <namespace-name>

AmericasEuropeAndAsia に置き換え、コマンドをもう一度実行すると、両方のサブスクリプションに同じ数のメッセージがあることがわかります。

サブスクリプションのトピック メッセージを取得するコードを記述する

販売業績に関するメッセージを取得するコンポーネントを作成するには、次の手順のようにします。

  1. code . を実行してエディターを起動します。

  2. エディターで、performancemessagereceiver/Program.cs を開き、次のコード行を見つけます。

    const string ServiceBusConnectionString = "";
    

    引用符の間に、前の演習で保存した接続文字列を貼り付けます。

  3. Service Bus クライアントを作成するには、MainAsync() メソッドを見つけます。 そのメソッド内で、次のコード行を見つけます。

    // Create a Service Bus client that will authenticate using a connection string
    

    その行を次のコードに置き換えます。

    var client = new ServiceBusClient(ServiceBusConnectionString);
    
  4. メッセージ処理オプションを構成するには、次のコード行を見つけます。

    // Create the options to use for configuring the processor
    

    その行を次のコードに置き換えます。

    var processorOptions = new ServiceBusProcessorOptions
    {
        MaxConcurrentCalls = 1,
        AutoCompleteMessages = false
    };
    
  5. プロセッサを作成するには、次のコード行を見つけます。

    // Create a processor that we can use to process the messages
    

    その行を次のコードに置き換えます。

    ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions);
    
  6. ハンドラーを構成するには、次のコード行を見つけます。

    // Configure the message and error handler to use
    

    その行を次のコードに置き換えます。

    processor.ProcessMessageAsync += MessageHandler;
    processor.ProcessErrorAsync += ErrorHandler;
    
  7. 処理を開始するには、次のコード行を見つけます。

    // Start processing
    

    その行を次のコードに置き換えます。

    await processor.StartProcessingAsync();
    
  8. 次のコードを見つけます。

    // Since we didn't use the "await using" syntax here, we need to explicitly dispose the processor and client    
    

    その行を次のコードに置き換えます。

    await processor.DisposeAsync();
    await client.DisposeAsync();    
    
  9. 受信メッセージをコンソールに表示するには、MessageHandler() メソッドを見つけます。 受信メッセージを処理するためにこのメソッドを登録しました。

    そのメソッド内のすべてのコードを、次のコードに置き換えます。

    Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}");
    
  10. サブスクリプションから受信したメッセージを削除するには、その次の行に、以下のコードを追加します。

    await args.CompleteMessageAsync(args.Message);
    
  11. 最後のコードが次の例のようになっていることを確認します。

    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace performancemessagereceiver
    {
        class Program
        {
            const string ServiceBusConnectionString = "Endpoint=sb://alexgeddyneil.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx";
            const string TopicName = "salesperformancemessages";
            const string SubscriptionName = "Americas";
    
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                var client = new ServiceBusClient(ServiceBusConnectionString);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
                Console.WriteLine("======================================================");
    
                var processorOptions = new ServiceBusProcessorOptions
                {
                    MaxConcurrentCalls = 1,
                    AutoCompleteMessages = false
                };
    
                ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions);
    
                processor.ProcessMessageAsync += MessageHandler;
                processor.ProcessErrorAsync += ErrorHandler;
    
                await processor.StartProcessingAsync();
    
                Console.Read();
    
                await processor.DisposeAsync();
                await client.DisposeAsync();
            }
    
            static async Task MessageHandler(ProcessMessageEventArgs args)
            {
                Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}");
                await args.CompleteMessageAsync(args.Message);
            }
    
            static Task ErrorHandler(ProcessErrorEventArgs args)
            {
                Console.WriteLine($"Message handler encountered an exception {args.Exception}.");
                Console.WriteLine("Exception context for troubleshooting:");
                Console.WriteLine($"- Endpoint: {args.FullyQualifiedNamespace}");
                Console.WriteLine($"- Entity Path: {args.EntityPath}");
                Console.WriteLine($"- Executing Action: {args.ErrorSource}");
                return Task.CompletedTask;
            }
        }
    }
    
  12. 変更を保存するには、Ctrl + S キーを押します。また、エディターを閉じるには、Ctrl + Q キーを押します。

サブスクリプションのトピック メッセージを取得する

  1. サブスクリプションの販売業績に関するメッセージを取得するコンポーネントを実行するには、次のコマンドを実行します。

    dotnet run --project performancemessagereceiver
    

    次の例のような出力が表示されます。

    Received message: SequenceNumber:1 Body:Total sales for Brazil in August: $13m.
    
  2. メッセージを受信しているという通知がプログラムによって返されたら、Enter キーを押してアプリを停止します。

サブスクリプションのメッセージを取得した後でメッセージ数を調べる

次のコマンドを実行して、Americas サブスクリプションにメッセージが残っていないことを確認します。 <namespace-name> を Service Bus 名前空間に必ず置き換えてください。

az servicebus topic subscription show \
     --resource-group <rgn>[sandbox resource group name]</rgn> \
     --topic-name salesperformancemessages \
     --name Americas \
     --query messageCount \
     --namespace-name <namespace-name> 

このコードの AmericasEuropeAndAsia に置き換えて、EuropeAndAsia サブスクリプションの現在のメッセージ数を確認すると、メッセージの数が 1 であることがわかります。 前のコードでは、Americas のみのトピック メッセージを取得するように設定したので、EuropeAndAsia に対するそのメッセージはまだ取得されるのを待機しています。