単純な監視可能シーケンスの作成とサブスクライブ

監視可能なシーケンスを作成するために、IObservable<T> インターフェイスを手動で実装する必要はありません。 同様に、シーケンスをサブスクライブするために IObserver<T> を実装する必要はありません。 リアクティブ拡張アセンブリをインストールすると、 Observable 型を利用できます。この型では、0 個以上の要素を含む単純なシーケンスを作成するための静的 LINQ 演算子が多数用意されています。 さらに、Rx は、デリゲートの観点から OnNext、OnError、OnCompleted ハンドラーのさまざまな組み合わせを受け取る Subscribe 拡張メソッドを提供します。

単純なシーケンスの作成とサブスクライブ

次の例では、 Observable 型の Range 演算子を使用して、数値の単純な監視可能なコレクションを作成します。 オブザーバーは 、Observable クラスの Subscribe メソッドを使用してこのコレクションをサブスクライブし、OnNext、OnError、OnCompleted を処理するデリゲートであるアクションを提供します。

Range 演算子には、いくつかのオーバーロードがあります。 この例では、x で始まる整数のシーケンスを作成し、その後 y の連続する数値を生成します。 

サブスクリプションが発生するとすぐに、値がオブザーバーに送信されます。 その後、OnNext デリゲートによって値が出力されます。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

オブザーバーが監視可能なシーケンスをサブスクライブする場合、Subscribe メソッドを呼び出すスレッドは、シーケンスが完了するまで実行されるスレッドとは異なる場合があります。 したがって、Subscribe 呼び出しは、シーケンスの観察が完了するまで呼び出し元がブロックされないという点で非同期です。 詳細については、「 Schedulers の使用 」トピックで説明します。

Subscribe メソッドは IDisposable を返すので、シーケンスのサブスクライブを解除して簡単に破棄できます。 監視可能なシーケンスで Dispose メソッドを呼び出すと、オブザーバーは監視可能なデータのリッスンを停止します。  通常、早い段階でサブスクライブ解除する必要がある場合や、ソースの監視可能シーケンスの有効期間がオブザーバーよりも長い場合を除き、Dispose を明示的に呼び出す必要はありません。 Rx のサブスクリプションは、ファイナライザーを使用せずに、ファイア アンド フォーゲット シナリオ用に設計されています。 ガベージ コレクターによって IDisposable インスタンスが収集されると、Rx はサブスクリプションを自動的に破棄しません。 ただし、Observable 演算子の既定の動作は、サブスクリプションをできるだけ早く破棄することです (つまり、OnCompleted メッセージまたは OnError メッセージが発行された場合)。 たとえば、コード var x = Observable.Zip(a,b).Subscribe(); は x を a と b の両方のシーケンスにサブスクライブします。 がエラーをスローした場合、x はすぐに b からサブスクライブ解除されます。

また、指定した OnNext、OnError、OnCompleted アクション デリゲートからオブザーバーを作成して返す Observable 型の Create 演算子を使用するようにコード サンプルを調整することもできます。 その後、このオブザーバーを Observable 型の Subscribe メソッドに渡すことができます。 次の例は、これを行う方法を示しています。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

監視可能なシーケンスを最初から作成するだけでなく、既存の列挙子、.NET イベント、非同期パターンを監視可能なシーケンスに変換できます。 このセクションの他のトピックでは、これを行う方法について説明します。

このトピックでは、監視可能なシーケンスを最初から作成できるいくつかの演算子のみを示していることに注意してください。 その他の LINQ 演算子の詳細については、「LINQ 演算子 を使用した監視可能なシーケンスのクエリ」を参照してください。

タイマーの使用

次の例では、Timer 演算子を使用してシーケンスを作成します。 シーケンスは、5 秒が経過した後に最初の値をプッシュし、1 秒ごとに後続の値をプッシュします。 説明のために、Timestamp 演算子をクエリに連結して、プッシュされた各値が発行時刻までに追加されるようにします。 これにより、このソース シーケンスをサブスクライブするときに、その値とタイムスタンプの両方を受け取ることができます。

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

出力は次のようになります。

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

Timestamp 演算子を使用して、シーケンスが開始されてから 5 秒後に最初の項目が実際にプッシュアウトされ、各項目が 1 秒後に発行されることを確認しました。

列挙可能なコレクションを監視可能なシーケンスに変換する

ToObservable 演算子を使用すると、ジェネリック列挙可能なコレクションを監視可能なシーケンスに変換し、それをサブスクライブできます。

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

コールドオブザーブルとホットオブザーブル

コールドオブザーバはサブスクリプションで実行を開始します。つまり、監視可能なシーケンスは、Subscribe が呼び出されたときにオブザーバーに値をプッシュし始めるだけです。 値はサブスクライバー間でも共有されません。 これは、サブスクリプションがアクティブになる前に既に値を生成しているマウス移動イベントや株式ティッカーなどのホットオブザーブルとは異なります。 オブザーバーは、ホットな監視可能なシーケンスをサブスクライブすると、ストリーム内の現在の値を取得します。 ホット監視可能シーケンスはすべてのサブスクライバー間で共有され、各サブスクライバーはシーケンス内の次の値をプッシュされます。 たとえば、誰も特定の株式ティッカーを購読していない場合でも、ティッカーは市場の動きに基づいて値を更新し続けます。 サブスクライバーがこのティッカーに関心を登録すると、自動的に最新のティックが取得されます。

次の例は、コールド オブザブル シーケンスを示しています。 この例では、Interval 演算子を使用して、特定の間隔 (この場合は 1 秒ごと) でポンプで出力される数値の単純な観測可能なシーケンスを作成します。

その後、2 つのオブザーバーがこのシーケンスをサブスクライブし、その値を出力します。 サブスクライバーごとにシーケンスがリセットされ、2 番目のサブスクリプションが最初の値からシーケンスを再起動することがわかります。

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

次の例では、Publish 演算子を使用して、前のコールド監視可能なシーケンスをホットシーケンス source に変換します。これは、 という名前 hotの IConnectableObservable インスタンスを返します。 Publish オペレーターは、1 つのサブスクリプションを複数のサブスクライバーにブロードキャストすることで、サブスクリプションを共有するメカニズムを提供します。 hot はプロキシとして機能し、 を sourceサブスクライブし、 から source値を受け取ると、その値を独自のサブスクライバーにプッシュします。 バッキング source のサブスクリプションを確立し、値の受信を開始するには、IConnectableObservable.Connect() メソッドを使用します。 IConnectableObservable は IObservable を継承するため、Subscribe を使用して、実行を開始する前でもこのホット シーケンスをサブスクライブできます。 この例では、ホット シーケンスはサブスクライブ時 subscription1 に開始されていないことに注意してください。 したがって、値はサブスクライバーにプッシュされません。 Connect を呼び出すと、値は に subscription1プッシュされます。 3 秒の遅延が発生した後、 subscription2 現在の位置 (この場合は 3) から最後まですぐに値の受信をサブスクライブ hot して開始します。 出力は次のようになります。

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();