Windows Azure の詳細

Windows Azure サービス バス: セッションを使用するメッセージング パターン

Bruno Terkaly
Ricardo Villalobos

コード サンプルのダウンロード

Bruno Terkaly Ricardo Villalobos前回までのコラムの 1 つで、複数のソリューションを分離し、ソフトウェア アーキテクチャのスケール変換を容易にするため、クラウドでメッセージング パターンを使用する重要性を取り上げました (詳しくは、「Windows Azure のキューとサービス バスのキューを比較する」(msdn.microsoft.com/magazine/jj159884、英語) を参照してください)。キューはこのようなメッセージング パターンの 1 つで、Windows Azure プラットフォームにはこの手法を実装する主な方法が 2 つ用意されています。1 つは、キュー ストレージ サービス、もう 1 つはサービス バス キューで、どちらも、複数のユーザーがキューでメッセージを受信し、キュー内の各メッセージを競って処理するシナリオに対処できます。これは、クラウドでのワークロードの変化に対応する標準モデルで、キューのサイズを基に受信側を動的に追加または削除できるため、バックエンドの負荷分散やフェールオーバーのメカニズムを実現できます (図 1 参照)。

Queuing Messaging Pattern: Each Message Is Consumed by a Single Receiver
図 1 キュー メッセージング パターン: 1 つの受信側でそれぞれメッセージが処理される

キュー メッセージング パターンではソリューションを簡単に分離できますが、受信側がそれぞれメッセージを独自にコピーし、明確な規則に基づいてメッセージの一部を破棄するオプションが必要になる場合があります。このようなシナリオの例を図 2 に示します。この例では、小売企業が最新の製品カタログと更新した価格表などの情報を複数の店舗に送信するときによく生じる課題を示しています。

Publisher/Subscriber Messaging Pattern: Each Message Can Be Consumed More Than Once
図 2 パブリッシャー/サブスクライバー メッセージング パターン: それぞれのメッセージが複数回処理される

このようなシナリオには、パブリッシャー/サブスクライバー パターンが適しています。このパターンでは、受信側が単純にメッセージの 1 つ以上のカテゴリを利用することを表明し、そのメッセージ ストリームのコピーを含むカテゴリ別のサブスクリプションに接続します。Windows Azure サービス バスは、このパブリッシャー/サブスクライバー メッセージング パターンの実装にトピックとサブスクリプションを使用します。これにより、カテゴリ別の規則やフィルターに基づいてメッセージの分散を制御する機能を大幅に強化しています。今回は、シンプルで実践的なシナリオを使用して、Windows Azure サービス バスの各機能を当てはめる方法を説明します。今回は、次の要件を想定します。

  1. 製品はカタログ ページを基に順番に受け取る。
  2. 一部の店舗ではカタログの中の特定のカテゴリを扱わないため、このカテゴリの製品を各店舗がフィルターで除外する。
  3. メッセージをすべて受信するまで、新しいカタログ情報を店舗のシステムに適用しない。

付属のコード サンプルはすべて、プログラミング言語に C# を使用して Visual Studio 2012 で作成しました。.NET 開発者向けの Windows Azure SDK バージョン 1.8 と Windows Azure サブスクリプションへのアクセスも必要です。

プロジェクトのメッセージング基盤を設定する

コードの記述に取り掛かる前に、メッセージング ワークフローの一部となるさまざまなエンティティ (トピックとサブスクリプション) を定義する必要があります。これは、Windows Azure ポータル (manage.windowsazure.com) にアクセスすることで実現できます。資格情報を入力してログインし、次の手順を実行します。

  1. 管理ポータル左下の新規作成のアイコンをクリックします。
  2. [APP SERVICES] (アプリケーション サービス)、[SERVICE BUS TOPIC] (サービス バス トピック)、[CUSTOM CREATE] (カスタム作成) の順にクリックします (図 3 参照)。
  3. 最初のダイアログ画面でトピック名を入力し、適切な地域と Windows Azure サブスクリプション ID を選択します。これが選択した地域で 1 つ目の名前空間であれば、ウィザードに "<エンティティ名>-ns" のような名前空間キューが表示されます。この値は変更できます。
  4. 右向き矢印の [NEXT] (次へ) をクリックし、その他のプロパティを挿入します。既定値をそのまま使用できます。チェック ボックスをオンにして、トピックを作成します。
  5. 左側のナビゲーション バーのサービス バス アイコンをクリックし、名前空間の一覧を取得します。名前空間の一覧は、すぐに表示されない場合があります。名前空間が作成され、ポータルのインターフェイスが更新されるまで数秒かかります。
  6. 作成したトピックを一覧から選択し、画面下部に表示される [ACCESS KEY] (アクセス キー) をクリックします。後で使用するため、接続文字列全体を書き留めます。
  7. Windows Azure ポータル画面上部の [SUBSCRIPTIONS] (サブスクリプション) をクリックし、[CREATE A NEW SUBSCRIPTION] (新しいサブスクリプションの作成) をクリックします。ポップアップ ダイアログで名前 (ここでは「Store1Sub」を使用) を入力し、矢印をクリックして次に進みます。
  8. 次の画面では既定値をそのまま使用し、セッションを有効にするオプションをオンにします。チェック ボックスをオンにして、サブスクリプションを作成します。サブスクライバーはこのセッションを使用して、メッセージを順番に取得します。
  9. 3 つの各店舗で手順 7. と手順 8. を繰り返します。

Creating a New Service Bus Topic Using the Windows Azure Portal
図 3 Windows Azure ポータルを使用した新しいサービス バス トピックの作成

トピックとサブスクリプションを作成したら、Visual Studio から直接アクセスすることもできます。これを行うには、[表示] メニューの [サーバー エクスプローラー] をクリックしてサーバー エクスプローラーを開き、[Windows Azure Service Bus] (Windows Azure サービス バス) ノードを展開します (図 4 参照)。[Windows Azure Service Bus] (Windows Azure サービス バス) ノードを右クリックし、[接続の追加] をクリックします。名前空間名、発行者名 (通常は "所有者")、およびポータルで Windows Azure 名前空間を作成したときに書き留めた発行者のアクセス キーを入力します。

Creating a Service Bus Topic and Subscriptions Using the Visual Studio Tools
図 4 Visual Studio ツールを使用したサービス バス トピックとサブスクリプションの作成

TopicClient や SubscriptionClient など、Microsoft.ServiceBus.Messaging 名前空間のクラスを使用して、これらのエンティティをプログラムで作成および管理することができます。これらのクラスは後半で使用します。

メッセージング ワークフローの基本構造を作成したら、Visual Studio で作成した 2 つのコンソール アプリケーションを使用して、トラフィックのシミュレーションを行います (図 5 参照)。1 つ目のコンソール アプリケーションの MSDNSender が製品カタログを送信します。2 つ目のコンソール アプリケーションの MSDNReceiver が情報を受信する各店舗です。コードについてはこの後分析します。パブリッシャー/サブスクライバー パターンに当てはめると、MSDNSender がパブリッシャー、MSDNReceiver がサブスクライバーです。

Visual Studio Solution to Simulate the Products Catalog Scenario
図 5 製品カタログ シナリオのシミュレーションを行う Visual Studio ソリューション

本社から製品カタログを送信する

図 2 に示すように、本社 (パブリッシャー) がメッセージをトピックに送信します。このロジックを表すのが MSDNSender プロジェクトに含まれるメイン ファイル (Program.cs) のコードです。製品の一覧を個別のメッセージとしてトピックに送信するロジックとコードを Program.cs ファイルにカプセル化しています。このコードのさまざまなセクションを見ていきましょう。最初は、Main メソッドです。まず、次のようにトピックのクライアントを作成します。

// Create a topicClient using the
// Service Bus credentials
TopicClient topicClient =
  TopicClient.CreateFromConnectionString(
  serviceBusConnectionString, topicName);

topicClient を作成したら、パブリッシャーはこれを使用してメッセージを送信できます。送信する製品一覧は、ProductsCatalog.xml という XML ファイルに保存されています。このファイルには、オブジェクトの配列に変換される 10 個の製品エンティティの一覧が含まれます。次に、製品を Product.cs ファイルに収められている Catalog クラスおよび Product クラスにマップします。

// Deserialize XML file with Products, and store them in an object array
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();

カタログの配列内の各 Product クラスは図 6 に示す構造体を表します。

図 6 カタログの製品を表すクラス

public class Product
  {
    [System.Xml.Serialization.XmlElement("ProductId")]
    public string ProductId { get; set; }
    [System.Xml.Serialization.XmlElement("ProductName")]
    public string ProductName { get; set; }
    [System.Xml.Serialization.XmlElement("Category")]
    public string Category { get; set; }
    [System.Xml.Serialization.XmlElement("CatalogPage")]
    public int CatalogPage { get; set; }
    [System.Xml.Serialization.XmlElement("MSRP")]
    public double MSRP { get; set; }
    [System.Xml.Serialization.XmlElement("Store")]
    public string Store { get; set; }
  }

配列のループ内では、CreateMessage メソッドを呼び出して Product オブジェクトからさまざまなプロパティを抽出し、送信メッセージに割り当てます。次の 2 つのプロパティには特に注意が必要です。

if (isLastProductInArray)
  message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;

セッションは特に重要で、受信側はこのセッションを使って特定の論理グループに属するすべてのメッセージを受信したかどうかを判断します。ここでは、SessionId メッセージ プロパティを設定することで、同じ catalogName 値のメッセージをすべて受信するまで、受信側がカテゴリ情報を使用しないように指定しています。また、配列の最後の製品には、IsLastMessageInSession という新しいプロパティを追加します。これにより、受信側はセッションの最後のメッセージを受信し、カタログをすべて処理したかどうかを判断できます。図 7 は、MSDNSender の実行画面を示しています。

Execution of the MSDNSender Project
図 7 MSDNSender プロジェクトの実行

店舗でサブスクリプションを使用して製品カタログを受信する

ここまでで、カタログと製品をトピックに送信し、異なるサブスクリプションにコピーしました。続いて、メッセージを受信および処理する MSDNReceiver プロジェクトについて見てみましょう。Program.cs ファイルの Main メソッドのコードでは、ユーザーが Console.ReadLine コマンドを使用して指定する情報に基づいて、サブスクリプションのクライアントを作成します。ユーザーは利用している店舗の番号を入力し、この番号で受信するメッセージを特定するものとします。つまり、各店舗はその店舗に関連するメッセージのみを処理します。

Console.WriteLine("Enter Store Number");
  string storeNumber = Console.ReadLine();
  Console.WriteLine("Selecting Subscription for Store...");
  // Create a Subscription Client to the Topic
  SubscriptionClient subscriptionClient =
    SubscriptionClient.CreateFromConnectionString(
    serviceBusConnectionString, topicName,
    "Store" + storeNumber.Trim() + "Sub",
    ReceiveMode.PeekLock);

前のセクションで説明したように、セッションに基づいてサブスクリプションからメッセージを受信するため、次のコードを使用して続きのメッセージを要求する必要があります。

MessageSession sessionReceiver =
  subscriptionClient.AcceptMessageSession(TimeSpan.FromSeconds(5));

基本的に、クライアントはサブスクリプション内のメッセージを調べ、SessionId プロパティが null ではないメッセージを処理対象とします。また、5 秒以内にこのようなメッセージを検出できなかった場合、要求はタイムアウトして受信側アプリケーションは終了します。セッションが見つかれば、ReceivingSessionMessages メソッドを呼び出します。このコードを見る前に、セッション状態について大まかに説明します。開発者は、セッション状態を使用して、同じトランザクションに属するメッセージを受信しながら使用可能な情報を保存できます。ここでは、セッション状態を使用して、受信した最後のカタログ ページと、適切な順序に従わずに受信したメッセージ (製品) を "記憶" します。

これを基に、コードのワークフローは次のようになります。

  1. 現在のメッセージは、ReceiveSessionMessages メソッドで受信します (図 8 参照)。このメソッドでは、ProcessMessage メソッド (図 9) を使用してメッセージを処理します。
  2. ProcessMessage メソッド内では、メッセージを順番どおりに受け取っていない場合にそのメッセージを自動的に保留にし、メッセージ ID をセッション状態に保存します。正しい順番のメッセージは "完了" とマークし、サブスクリプションから削除します。また、次に受信予定の番号のカタログ ページもセッションに保存します。
  3. 受信した現在のメッセージを処理したら、ReceiveSessionMessages メソッドの後続のコードで、セッション内に保留したメッセージ ID を調べ、最新のカタログ ページを基に再度処理を試みます。
  4. セッションのすべてのメッセージを受け取ったら、受信側の処理は終了します。

図 8 ReceivedSessionMessages メソッドのコード

static void ReceiveSessionMessages(MessageSession receiver)
  {
    // Read messages from subscription until subscription is empty
    Console.WriteLine("Reading messages from subscription {0}", 
      receiver.Path);
    Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
    Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
    SequenceState sessionState = GetState(receiver);
    BrokeredMessage receivedMessage;
    while ((receivedMessage = receiver.Receive()) != null)
    {
      string sessionId = receiver.SessionId;
      ProcessMessage(receivedMessage, ref sessionState, receiver);
      while (sessionState.GetNextOutOfSequenceMessage() != -1)
      {
        // Call back deferred messages
        Console.WriteLine("Calling back for deferred message: Category {0},
          Message sequence {1}", receiver.SessionId,
            sessionState.GetNextSequenceId());
        receivedMessage = receiver.Receive(
          sessionState.GetNextOutOfSequenceMessage());
        ProcessMessage(receivedMessage, ref sessionState, receiver);
      }
      if (receivedMessage.Properties.ContainsKey(
        "IsLastMessageInSession"))
        break;
    }
    SetState(receiver, null);
    receiver.Close();
  }

図 9 ProcessMessage メソッドのコード

static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
  MessageSession session = null)
  {
    if (session != null)
    {
      int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
      if (sessionState.GetNextSequenceId() == messageId)
      {
        OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
        sessionState.SetNextSequenceId(messageId + 1);
        message.Complete();
        SetState(session, sessionState);
      }
      else
      {
        Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
          session.SessionId, messageId);
        sessionState.AddOutOfSequenceMessage(messageId, 
          message.SequenceNumber);
        message.Defer();
        SetState(session, sessionState);
      }
    }
    Thread.Sleep(receiverDelay);
  }

このプロジェクトでは、保留するメッセージ ID をセッション状態に保存していますが、これは失われる可能性があります。そのため、運用環境では永続ストレージ (Windows Azure テーブルなど) を使用することをお勧めします。メッセージに (送信処理中に設定される) IsLastMessageSessionInSession プロパティが含まれる場合、セッション ループを終了します。MSDNReceiver プロジェクトのコンソール出力を図 10 に示します。

Execution of the MSDNReceiver project
図 10 MSDNReceiver プロジェクトの実行

Windows Azure サービス バス サブスクリプションでは、処理する前にフィルターでメッセージを除外する具体的な規則を作成できます。ここでは、製品をカテゴリまたは店舗の番号別に振り分ける規則を比較的簡単に作成できます (この規則について、今回のプロジェクトでは無視しています)。規則は Windows Azure ポータルまたは Visual Studio ツールを使って直接プログラムで作成できます。

まとめ

Windows Azure サービス バスにより、非常に堅牢で柔軟性の高いパブリッシャー/サブスクライバー パターンを実装できます。トピックとサブスクリプションを使用して、さまざまなシナリオに対処できます。複数の受信側にメッセージを送信する複数の送信側をサポートする機能と、メッセージを論理的にグループ化して並べ替える機能を組み合わせることで、開発者の可能性が大きく広がります。さらに、状態を追跡する永続的セッションを活用できるため、メッセージの論理的なグループ化と、その順番の制御を簡単に行うことができます。分散環境が一般的に使用される現在、関連するメッセージング パターンとツールの使用方法を理解することは、クラウドで作業する今日のソフトウェア アーキテクトにとってはきわめて重要になります。

Bruno Terkaly は、マイクロソフトの開発者エバンジェリストです。彼の深い知識は、多数のプラットフォーム、言語、フレームワーク、SDK、ライブラリ、および API を使用してコードを作成し、現場で長年の経験を積むことで得られたものです。コードの作成、ブログ、クラウド ベースのアプリケーション構築 (特に、Windows Azure プラットフォームの使用) に関するライブ プレゼンテーションに携わっています。

Ricardo Villalobos は、経験豊かなソフトウェア アーキテクトとして、サプライ チェーン管理業界の企業用アプリケーションを 15 年以上にわたって設計および作成しています。さまざまな技術認定資格の保持者であり、ダラス大学で経営管理の修士号を取得しています。彼はマイクロソフトの Windows Azure CSV 開発支援グループのクラウド アーキテクトを務めています。

この記事のレビューに協力してくれた技術スタッフの Abhishek Lal に心より感謝いたします。