既存の非同期ソースとのブリッジング

.NET イベントに加えて、他の非同期データ ソースも.NET Frameworkに存在します。 そのうちの 1 つは非同期メソッド パターンです。 この設計パターンでは、2 つのメソッドが提供されます。 計算を開始するために 1 つのメソッド (通常は BeginX という名前) が使用され、2 番目のメソッド (通常は EndX という名前) に渡される IAsyncResult ハンドルが返され、計算結果が取得されます。 完了は通常、AsyncCallback デリゲートを実装するか、IAsyncResult.IsCompleted をポーリングすることによって通知されます。 このパターンに準拠するコードは、多くの場合、読み取りと保守が困難です。 このトピックでは、Rx ファクトリ メソッドを使用して、このような非同期データ ソースを監視可能なシーケンスに変換する方法について説明します。

非同期パターンを監視可能なシーケンスに変換する

.NET の多くの非同期メソッドは、BeginX や EndX などのシグネチャを使用して書き込まれます。ここで、X は非同期的に実行されるメソッド名です。 BeginX は、メソッドを実行するための引数を受け取ります。AsyncCallback は、IAsyncResult を受け取り、何も返さなかったアクションであり、最後にオブジェクトの状態になります。 EndX は、非同期呼び出しの値を取得するために AsyncCallback から渡される IAsyncResult を受け取ります。

Observable 型の FromAsyncPattern 演算子は、Begin メソッドと End メソッド (パラメーターとして演算子に渡される) をラップし、Begin と同じパラメーターを受け取り、監視可能な メソッドを返す関数を返します。 この監視可能は、1 つの値を発行するシーケンスを表します。これは、指定した呼び出しの非同期結果です。

次の例では、IAsyncResult パターンを使用する Stream オブジェクトの BeginRead と EndRead を、監視可能なシーケンスを返す関数に変換します。 FromAsyncPattern 演算子のジェネリック パラメーターには、 コールバックまで BeginRead の引数の型を指定します。 EndRead メソッドは値を返すので、この型を FromAsyncPattern の最終的なジェネリック パラメーターとして追加します。 をポイントvarreadすると、FromAsyncPattern の戻り値は、次のシグネチャFunc<byte[], int32,int32, IObservable<int32>>を持つ関数デリゲートであることがわかります。つまり、この関数は 3 つのパラメーター (BeginRead の場合と同じ) を受け取り、IObservable<Int32> を返します。 この IObservable には、EndRead によって返される整数である 1 つの値が含まれており、0 から要求したバイト数までのストリームから読み取られたバイト数が含まれます。 IAsyncResult ではなく IObservable が得られるようになったため、Observables で使用できるすべての LINQ 演算子を使用し、サブスクライブ、解析、または作成できます。

Stream inputStream = Console.OpenStandardInput();
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
byte[] someBytes = new byte[10];
IObservable<int> source = read(someBytes, 0, 10);
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();