LINQ 演算子を使用した監視可能シーケンスのクエリ

既存の .NET イベントを使用したブリッジングでは、既存の .NET イベントを監視可能なシーケンスに変換して、それらをサブスクライブしました。 このトピックでは、観察可能なシーケンスの最初のクラスの性質を IObservable<T> オブジェクトとして見ていきます。このオブジェクトを操作するために、Rx アセンブリによって汎用 LINQ 演算子が提供されます。 ほとんどの演算子は、監視可能なシーケンスを受け取り、それに対していくつかのロジックを実行し、別の監視可能なシーケンスを出力します。 さらに、コード サンプルからわかるように、ソース シーケンスで複数の演算子を連結して、結果のシーケンスを正確な要件に合わせて調整することもできます。

異なる演算子の使用

前のトピックの Create 演算子と Generate 演算子を使用して、単純なシーケンスを作成して返しました。 また、FromEventPattern 演算子を使用して、既存の .NET イベントを監視可能なシーケンスに変換しました。 このトピックでは、Observable 型の他の静的 LINQ 演算子を使用して、データをフィルター処理、グループ化、変換できるようにします。 このような演算子は、監視可能なシーケンスを入力として受け取り、出力として監視可能なシーケンスを生成します。

異なるシーケンスの組み合わせ

このセクションでは、さまざまな観測可能なシーケンスを 1 つの監視可能なシーケンスに結合する演算子の一部について説明します。 シーケンスを組み合わせると、データは変換されないことに注意してください。

次の例では、Concat 演算子を使用して 2 つのシーケンスを 1 つのシーケンスに結合し、それをサブスクライブします。 説明のため、非常に単純な Range(x, y) 演算子を使用して、x で始まり、その後 y の連続する数値を生成する整数のシーケンスを作成します。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

結果のシーケンスが であることに 1,2,3,1,2,3注意してください。 これは、Concat 演算子を使用する場合、1 番目のシーケンス () がそのすべての値のプッシュを完了するまで、2 番目のシーケンス (source1source2) はアクティブにならないためです。 完了した後 source1 にのみ、 source2 結果のシーケンスに値のプッシュが開始されます。 その後、サブスクライバーは、結果のシーケンスからすべての値を取得します。

これを Merge 演算子と比較します。 次のサンプル コードを実行すると、 が表示 1,1,2,2,3,3されます。 これは、2 つのシーケンスが同時にアクティブであり、値がソースで発生するとプッシュアウトされるためです。 結果のシーケンスは、最後のソース シーケンスが値のプッシュを完了したときにのみ完了します。

マージを機能させるには、すべてのソース監視可能シーケンスが同じ種類の IObservable<T> である必要があることに注意してください。 結果のシーケンスは、IObservable<T> 型になります。 シーケンスの途中で OnError が生成された場合 source1 、結果のシーケンスはすぐに完了します。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Catch 演算子を使用して、別の比較を行うことができます。 この場合、 がエラーなしで完了した場合 source1source2 は開始されません。 したがって、次のサンプル コードを実行すると1,2,3、 (を生成4,5,6する) は無視されるためsource2、取得されます。

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

最後に、OnErrorResumeNext を見てみましょう。 エラーが原因で source2 完了できない場合 source1 でも、この演算子は に移動します。 次の例では、source1例外 (Throw 演算子を使用して) で終了するシーケンスを表していても、サブスクライバーは によってsource2発行された値 (1,2,3) を受け取ります。 したがって、いずれかのソース シーケンスでエラーが発生することが予想される場合は、OnErrorResumeNext を使用して、サブスクライバーが引き続きいくつかの値を受け取ることを保証することをお勧めします。

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

これらすべての組み合わせ演算子が機能するためには、観測可能なすべてのシーケンスが同じ種類の T である必要があることに注意してください。

Projection

Select 演算子は、監視可能なシーケンスの各要素を別の形式に変換できます。

次の例では、整数のシーケンスをそれぞれ長さ n の文字列に投影します。

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

次の例では、既存の .NET イベントを使用した ブリッジング に関するトピックで説明した .NET イベント変換の例の拡張です。Select 演算子を使用して、IEventPattern<MouseEventArgs> データ型を Point 型に投影します。 このようにして、マウスの移動イベント シーケンスを、次の 「フィルター処理」セクションで見られるように、さらに解析および操作できるデータ型に変換します。

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

最後に、SelectMany 演算子を見てみましょう。 SelectMany 演算子には多くのオーバーロードがあり、そのうちの 1 つはセレクター関数引数を受け取ります。 このセレクター関数は、ソース監視可能によってプッシュされたすべての値に対して呼び出されます。 これらの値ごとに、セレクターによってミニ監視可能なシーケンスに投影されます。 最後に、SelectMany 演算子は、これらすべてのミニ シーケンスを 1 つの結果シーケンスにフラット化し、サブスクライバーにプッシュします。

SelectMany から返される監視可能オブジェクトは、ソース シーケンスとセレクターによって生成されたすべてのミニ監視可能なシーケンスが完了した後に OnCompleted を発行します。 ソース ストリームでエラーが発生したとき、セレクター関数によって例外がスローされたとき、またはミニ監視可能なシーケンスのいずれかでエラーが発生したときに、OnError が発生します。

次の例では、最初に 5 秒ごとに整数を生成するソース シーケンスを作成し、(Take 演算子を使用して) 生成された最初の 2 つの値を取得することにしました。 次に、 を使用 SelectMany して、 の別のシーケンス {100, 101, 102}を使用して、これらの各整数を投影します。 これにより、2 つのミニ監視可能なシーケンスと が生成されます {100, 101, 102}{100, 101, 102}。 これらは最終的に の整数 {100, 101, 102, 100, 101, 102} の単一ストリームにフラット化され、オブザーバーにプッシュされます。

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Filtering

次の例では、Generate 演算子を使用して、単純な観測可能な数値シーケンスを作成します。 Generate 演算子には、いくつかのオーバーロードがあります。 この例では、初期状態 (この例では 0)、終了する条件付き関数 (10 回未満)、反復子 (+1)、結果セレクター (現在の値の 2 乗関数) を受け取ります。 は、Where 演算子と Select 演算子を使用して、15 より小さいものだけを出力します。

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

次の例は、このトピックで前述したプロジェクションの例の拡張機能です。 このサンプルでは、Select 演算子を使用して、IEventPattern<MouseEventArgs> データ型を Point 型に投影しました。 次の例では、Where 演算子と Select 演算子を使用して、関心のあるマウスの動きのみを選択します。 この場合は、マウスの移動を最初の 2 等点 (x 座標と y 座標が等しい) 上の移動にフィルター処理します。

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

時間ベースの操作

Buffer 演算子を使用して、時間ベースの操作を実行できます。

監視可能なシーケンスをバッファリングすることは、監視可能なシーケンスの値が、指定された期間またはカウントしきい値に基づいてバッファーに格納されることを意味します。 これは、シーケンスによって大量のデータがプッシュされ、サブスクライバーにこれらの値を処理するリソースがない場合に特に役立ちます。 時間またはカウントに基づいて結果をバッファリングし、条件を超えたとき (またはソース シーケンスが完了した場合) にのみ値のシーケンスを返すことで、サブスクライバーは独自のペースで OnNext 呼び出しを処理できます。 

次の例では、最初に 1 秒ごとに整数の単純なシーケンスを作成します。 次に、Buffer 演算子を使用し、各バッファーがシーケンスから 5 つの項目を保持することを指定します。 OnNext は、バッファーがいっぱいになると呼び出されます。 次に、Sum 演算子を使用してバッファーの合計を集計します。 バッファーが自動的にフラッシュされ、別のサイクルが開始されます。 印刷結果は 10, 35, 60… 、10=0+1+2+3+4、35=5+6+7+8+9 などになります。

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

指定した期間のバッファーを作成することもできます。 次の例では、バッファーは 3 秒間累積された項目を保持します。 印刷結果は 3、12、21..になります。 3=0+1+2、12=3+4+5 など。

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Buffer または Window を使用している場合は、フィルター処理する前にシーケンスが空ではないことを確認する必要があることに注意してください。

カテゴリ別の LINQ 演算子

カテゴリ別の LINQ 演算子に関するトピックでは、Observable 型によってカテゴリ別に実装されるすべての主要な LINQ 演算子の一覧を示します。具体的には、作成、変換、結合、機能、数学、時間、例外、その他、選択、プリミティブ。

参照

リファレンス

Observable

概念

カテゴリ別の LINQ 演算子