トピックを使用してメッセージを送受信するコードを記述する

完了

分散アプリケーションでは、一部のメッセージは 1 つの受信側コンポーネントに対して使用される必要があります。 他のメッセージは、複数の宛先に到達する必要があります。

ユーザーが自転車の注文をキャンセルするとどうなるか考えてみます。 注文のキャンセルは、最初の注文とは少し異なります。 注文が行われるときのワークフローでは、注文の支払い処理が済むまで待った後、注文を現地の店舗に送信しました。 キャンセル操作の場合は、店舗と支払いプロセッサの "両方" に同時に通知します。 このようにすることで、配送ドライバーの時間が無駄になる可能性を最小限に抑えます。

複数のコンポーネントが同じメッセージを受信できるように、Azure Service Bus の "トピック" を使用します。 次に、コードを記述するプロセスと考慮事項について見ていきます。

トピックを含むコードと、キューを含むコードの違い

送信されたすべてのメッセージを、サブスクライブしているすべてのコンポーネントに配信する場合は、トピックを使用します。 トピックを使用するコードの記述は、キューを置き換える方法の 1 つです。 同じ Azure.Messaging.ServiceBus NuGet パッケージを使用し、接続文字列を構成して、非同期プログラミング パターンを使用します。

また、メッセージを送信するには同じ ServiceBusClient クラスと ServiceBusSender クラス、メッセージを受信するには ServiceBusProcessor クラスを使用します。

サブスクリプションでフィルターを設定する

トピックに送信された特定のメッセージが特定のサブスクリプションに配信されるようにしたい場合は、トピックのサブスクリプションに 1 つ以上のフィルターを設定できます。 たとえば、自転車アプリケーションでは、店舗でユニバーサル Windows プラットフォーム (UWP) アプリケーションを実行しています。 各店舗で OrderCancellation トピックにサブスクライブし、独自の StoreId でフィルター処理できます。 複数の場所にある店に不要なメッセージが送信されないので、インターネットの帯域幅が節約されます。 一方、支払い処理コンポーネントは、すべての OrderCancellation メッセージをサブスクライブします。

次の 3 種類のフィルターのいずれかを使用できます。

  • ブール フィルター:TrueFilter は、トピックに送信されたすべてのメッセージが現在のサブスクリプションに確実に配信されるようにします。 FalseFilter は、現在のサブスクリプションにメッセージがまったく配信されないようにします (これは、サブスクリプションを効果的にブロックまたはオフにします)。
  • SQL フィルター:SQL フィルターは、SQL クエリの WHERE 句と同じ構文を使用して条件を指定します。 このフィルターに対して評価されたときに True を返すメッセージのみが、サブスクライバーに配信されます。
  • 相関関係フィルター:相関関係フィルターは、各メッセージのプロパティと照合される条件のセットを保持します。 フィルターのプロパティとメッセージのプロパティが同じ値である場合、一致するものと見なされます。

StoreId フィルターの場合は、SQL フィルターを使用することも "できます"。 SQL フィルターは最も柔軟性がありますが、計算の負荷は最も大きく、フィルターによって Service Bus のスループットが低下する可能性があります。 この場合は、相関関係フィルターを選択します。

トピックにメッセージを送信するには

メッセージをトピックに送信するには、次の手順のようにします。

すべての送信または受信コンポーネントで、Service Bus トピックを呼び出すコード ファイルに次の using ステートメントを追加します。

using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

メッセージを送信するには、最初に新しい ServiceBusClient オブジェクトを作成し、それに接続文字列とトピックの名前を渡します。

await using var client = new ServiceBusClient(connectionString);

次に、ServiceBusClient オブジェクトで CreateSender メソッドを呼び出して、トピック名を指定することで、ServiceBusSender オブジェクトを作成します。

ServiceBusSender sender = client.CreateSender(topicName);

ServiceBusSender.SendMessageAsync() メソッドを呼び出して ServiceBusMessage を渡すことにより、トピックにメッセージを送信できます。 キューと同じように、メッセージは UTF-8 でエンコードされた文字列の形式にする必要があります。

string message = "Cancel! I have changed my mind!";
var message = new ServiceBusMessage(message);

// Send the message to the topic.
await sender.SendMessageAsync(message);

サブスクリプションからメッセージを受信するには

サブスクリプションからメッセージを受信するには、ServiceBusProcessor オブジェクトを作成し、トピック名とサブスクリプション名を渡す必要があります。

processor = client.CreateProcessor(topicName, subscriptionName, options);

次に、メッセージ ハンドラーとエラー ハンドラーを登録します。

// Specify the handler method for messages.
processor.ProcessMessageAsync += MessageHandler;

// Specify the handler method for errors.
processor.ProcessErrorAsync += ErrorHandler;

メッセージ ハンドラー内で処理を行ってから、ProcessMessageEventArgs.CompleteMessageAsync() メソッドを呼び出してサブスクリプションからメッセージを削除します。

// Complete the message. The message is deleted from the subscription. 
await args.CompleteMessageAsync(args.Message);