件名の使用

Subject<T> 型は、オブザーバーと監視可能の両方であるという意味で、IObservable<T> と IObserver<T> の両方を実装します。 サブジェクトを使用してすべてのオブザーバーをサブスクライブし、バックエンド データ ソースにサブジェクトをサブスクライブできます。 このようにして、サブジェクトはサブスクライバーとソースのグループのプロキシとして機能できます。 サブジェクトを使用して、キャッシュ、バッファリング、時間シフトを使用してカスタム監視可能を実装できます。 さらに、件名を使用して、複数のサブスクライバーにデータをブロードキャストすることもできます。

既定では、サブジェクトはスレッド間で同期を実行しません。 スケジューラーを使用するのではなく、すべてのシリアル化と文法の正確性がサブジェクトの呼び出し元によって処理されることを前提としています。  件名は、単にサブスクライバーのスレッド セーフなリスト内のすべてのサブスクライブされたオブザーバーにブロードキャストします。 これにより、オーバーヘッドを削減し、パフォーマンスを向上させるという利点があります。 ただし、スケジューラを使用してオブザーバーへの発信呼び出しを同期する場合は、Synchronize メソッドを使用して同期できます。

件名の使用

次の例では、件名を作成し、その件名をサブスクライブした後、同じサブジェクトを使用してオブザーバーに値を発行します。 これにより、パブリケーションとサブスクリプションを同じソースに結合します。

IObserver<T> の取得に加えて、Subscribe メソッドには onNext の Action<T> を受け取るオーバーロードもあります。これは、アイテムが発行されるたびにアクションが実行されることを意味します。 このサンプルでは、OnNext が呼び出されるたびに、項目がコンソールに書き込まれます。

Subject<int> subject = new Subject<int>();
var subscription = subject.Subscribe(
                         x => Console.WriteLine("Value published: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
subject.OnNext(1);

subject.OnNext(2);

Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subscription.Dispose();

次の例は、Subject のプロキシとブロードキャストの性質を示しています。 最初に、1 秒ごとに整数を生成するソース シーケンスを作成します。 次に、Subject を作成し、ソースにオブザーバーとして渡して、このソース シーケンスによってプッシュされたすべての値を受け取るようにします。 その後、件名をソースとして、さらに 2 つのサブスクリプションを作成します。 subSubject1サブスクリプションと subSubject2 サブスクリプションは、Subject によって (ソースから) 渡された任意の値を受け取ります。

var source = Observable.Interval(TimeSpan.FromSeconds(1));
Subject<long> subject = new Subject<long>();
var subSource = source.Subscribe(subject);
var subSubject1 = subject.Subscribe(
                         x => Console.WriteLine("Value published to observer #1: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
var subSubject2 = subject.Subscribe(
                         x => Console.WriteLine("Value published to observer #2: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subSubject1.Dispose();
subSubject2.Dispose();

異なる種類のサブジェクト

Rx ライブラリの Subject<T> 型は、ISubject<T> インターフェイスの基本的な実装です (ISubject<T> インターフェイスを実装して独自のサブジェクト型を作成することもできます)。 さまざまな機能を提供する ISubject<T> の他の実装があります。 これらの型はすべて、OnNext を介してプッシュされた値の一部 (またはすべて) を格納し、オブザーバーにブロードキャストし直します。 このようにして、ホットオブザブルをコールドオブザブルに変換します。 つまり、これらのいずれかを複数回サブスクライブすると (つまり、Subscribe - Unsubscribe ->> Subscribe again)、同じ値の少なくとも 1 つが再び表示されます。 ホットオブザーブルとコールドオブザーブルの詳細については、「 単純な監視可能シーケンスの作成とサブスクライブ 」トピックの最後のセクションを参照してください。

ReplaySubject には、発行されたすべての値が格納されます。 そのため、サブスクリプションをサブスクライブすると、特定の値がプッシュされた後にサブスクリプションが入力された可能性がある場合でも、発行された値の履歴全体が自動的に受け取ります。BehaviourSubject は ReplaySubject に似ていますが、公開された最後の値のみが格納されている点が除きます。 また、BehaviourSubject には、初期化時に T 型の既定値も必要です。 この値は、サブジェクトによって他の値がまだ受信されていない場合にオブザーバーに送信されます。 つまり、Subject が既に完了していない限り、すべてのサブスクライバーは Subscribe ですぐに値を受け取ります。 AsyncSubject は再生と動作のサブジェクトに似ていますが、最後の値のみが格納され、シーケンスが完了したときにのみ発行されます。 AsyncSubject 型は、ソース監視可能がホットで、オブザーバーがサブスクライブする前に完了する可能性がある状況に使用できます。 この場合でも、 AsyncSubject は最後の値を指定し、将来のサブスクライバーに発行できます。