ReplaySubject<T>.Subscribe メソッド

オブザーバーをサブジェクトにサブスクライブします。

Namespace:System.Reactive.Subjects
アセンブリ: System.Reactive (System.Reactive.dll)

構文

'Declaration
Public Function Subscribe ( _
    observer As IObserver(Of T) _
) As IDisposable
'Usage
Dim instance As ReplaySubject
Dim observer As IObserver(Of T)
Dim returnValue As IDisposable

returnValue = instance.Subscribe(observer)
public IDisposable Subscribe(
    IObserver<T> observer
)
public:
virtual IDisposable^ Subscribe(
    IObserver<T>^ observer
) sealed
abstract Subscribe : 
        observer:IObserver<'T> -> IDisposable 
override Subscribe : 
        observer:IObserver<'T> -> IDisposable 
public final function Subscribe(
    observer : IObserver<T>
) : IDisposable

パラメーター

  • オブザーバー
    種類: System.IObserver<T>
    件名をサブスクライブするオブザーバー。

戻り値

種類: System.IDisposable
サブジェクトからオブザーバーの登録を解除するために使用できる IDisposable オブジェクト。

実装

IObservable<T>.Subscribe(IObserver<T>)

解説

Subscribe を呼び出すと、ReplaySubject の IObservable インターフェイスへのサブスクリプションが作成されます。 ReplaySubject は、IObservable インターフェイスを介して、独自のサブスクリプションから受信したアイテムを発行します。

この例では、観測可能な文字列シーケンスの形式のモック ニュース フィードである NewsHeadlineFeed クラスを作成しました。 Generate 演算子を使用して、3 秒以内にランダムなニュースヘッドラインを連続して生成します。

ReplaySubject は、NewsHeadlineFeed クラスの 2 つのニュース フィードをサブスクライブするために作成されます。 件名がフィードにサブスクライブされる前に、Timestamp 演算子を使用して各ヘッドラインのタイムスタンプを設定します。 そのため、ReplaySubject が実際にサブスクライブするシーケンスは、IObservable<Timestamped<文字列>>型です。 ヘッドライン シーケンスのタイムスタンプを使用すると、サブスクライバーはサブジェクトの監視可能なインターフェイスをサブスクライブして、タイムスタンプに基づいてデータ ストリームまたはストリームのサブセットを観察できます。

ReplaySubject は、受信した項目をバッファーに格納します。 そのため、後で作成されたサブスクリプションは、既にバッファーされて発行されているシーケンスの項目にアクセスできます。 サブスクリプションは ReplaySubject に対して作成され、ローカル ニュース サブスクリプションが作成される 10 秒前に発生したローカル ニュースヘッドラインのみを受信します。 そのため、基本的には、10 秒前に発生した ReplaySubject "replay" があります。

ローカル ニュースの見出しには、newsLocation 部分文字列 ("in your area.") が含まれています。

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Threading;

namespace Example
{
  class Program
  {
    static void Main()
    {
      //*****************************************************************************************************//
      //*** A subject acts similar to a proxy in that it acts as both a subscriber and a publisher        ***//
      //*** It's IObserver interface can be used to subscribe to multiple streams or sequences of data.   ***//
      //*** The data is then published through it's IObservable interface.                                ***//
      //***                                                                                               ***//
      //*** In this example a simple ReplaySubject is used to subscribe to multiple news feeds            ***//
      //*** that provide random news headlines. Before the subject is subscribed to the feeds, we use     ***//
      //*** Timestamp operator to timestamp each headline. Subscribers can then subscribe to the subject  ***//
      //*** observable interface to observe the data stream(s) or a subset of the stream(s) based on      ***//
      //*** time.                                                                                         ***//
      //***                                                                                               ***//
      //*** A ReplaySubject buffers items it receives. So a subscription created at a later time can      ***//
      //*** access items from the sequence which have already been published.                             ***//
      //***                                                                                               ***//
      //*** A subscriptions is created to the ReplaySubject that receives only local news headlines which ***//
      //*** occurred 10 seconds before the local news subscription was created. So we basically have the  ***//
      //*** ReplaySubject "replay" what happened 10 seconds earlier.                                      ***//
      //***                                                                                               ***//
      //*** A local news headline just contains the newsLocation substring ("in your area.").             ***//
      //***                                                                                               ***//
      //*****************************************************************************************************//

      ReplaySubject<Timestamped<string>> myReplaySubject = new ReplaySubject<Timestamped<String>>();


      //*****************************************************************//
      //*** Create news feed #1 and subscribe the ReplaySubject to it ***//
      //*****************************************************************//

      NewsHeadlineFeed NewsFeed1 = new NewsHeadlineFeed("Headline News Feed #1");
      NewsFeed1.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);


      //*****************************************************************//
      //*** Create news feed #2 and subscribe the ReplaySubject to it ***//
      //*****************************************************************//

      NewsHeadlineFeed NewsFeed2 = new NewsHeadlineFeed("Headline News Feed #2");
      NewsFeed2.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);


      //*****************************************************************************************************//
      //*** Create a subscription to the subject's observable sequence. This subscription will filter for ***//
      //*** only local headlines that occurred 10 seconds before the subscription was created.            ***//
      //***                                                                                               ***//
      //*** Since we are using a ReplaySubject with timestamped headlines, we can subscribe to the        ***//
      //*** headlines already past. The ReplaySubject will "replay" them for the localNewSubscription     ***//
      //*** from its buffered sequence of headlines.                                                      ***//
      //*****************************************************************************************************//

      Console.WriteLine("Waiting for 10 seconds before subscribing to local news headline feed.\n");
      Thread.Sleep(10000);

      Console.WriteLine("\n*** Creating local news headline subscription at {0} ***\n", DateTime.Now.ToString());
      Console.WriteLine("This subscription asks the ReplaySubject for the buffered headlines that\n" +
                        "occurred within the last 10 seconds.\n\nPress ENTER to exit.", DateTime.Now.ToString());

      DateTime lastestHeadlineTime = DateTime.Now;
      DateTime earliestHeadlineTime = lastestHeadlineTime - TimeSpan.FromSeconds(10);     
      
      IDisposable localNewsSubscription = myReplaySubject.Where(x => x.Value.Contains("in your area.") &&
                                                               (x.Timestamp >= earliestHeadlineTime) &&
                                                               (x.Timestamp < lastestHeadlineTime)).Subscribe(x =>
      {
        Console.WriteLine("\n************************************\n" +
                          "***[ Local news headline report ]***\n" +
                          "************************************\n" + 
                          "Time         : {0}\n{1}\n\n", x.Timestamp.ToString(), x.Value);
      });

      Console.ReadLine();


      //*******************************//
      //*** Cancel the subscription ***//
      //*******************************//

      localNewsSubscription.Dispose();
      

      //*************************************************************************//
      //*** Unsubscribe all the ReplaySubject's observers and free resources. ***//
      //*************************************************************************//

      myReplaySubject.Dispose();    
    }
  }



  //*********************************************************************************//
  //***                                                                           ***//
  //*** The NewsHeadlineFeed class is just a mock news feed in the form of an     ***//
  //*** observable sequence in Reactive Extensions.                               ***//
  //***                                                                           ***//
  //*********************************************************************************//
  class NewsHeadlineFeed
  {
    private string feedName;                     // Feedname used to label the stream
    private IObservable<string> headlineFeed;    // The actual data stream
    private readonly Random rand = new Random(); // Used to stream random headlines.


    //*** A list of predefined news events to combine with a simple location string ***//
    static readonly string[] newsEvents = { "A tornado occurred ",
                                            "Weather watch for snow storm issued ",
                                            "A robbery occurred ",
                                            "We have a lottery winner ",
                                            "An earthquake occurred ",
                                            "Severe automobile accident "};

    //*** A list of predefined location strings to combine with a news event. ***//
    static readonly string[] newsLocations = { "in your area.",
                                               "in Dallas, Texas.",
                                               "somewhere in Iraq.",
                                               "Lincolnton, North Carolina",
                                               "Redmond, Washington"};

    public IObservable<string> HeadlineFeed
    {
      get { return headlineFeed; }
    }

    public NewsHeadlineFeed(string name)
    {
      feedName = name;

      //*****************************************************************************************//
      //*** Using the Generate operator to generate a continous stream of headline that occur ***//
      //*** randomly within 5 seconds.                                                        ***//
      //*****************************************************************************************//
      headlineFeed = Observable.Generate(RandNewsEvent(),
                                         evt => true,
                                         evt => RandNewsEvent(),
                                         evt => { Thread.Sleep(rand.Next(3000)); return evt; },
                                         Scheduler.ThreadPool);
    }


    //****************************************************************//
    //*** Some very simple formatting of the headline event string ***//
    //****************************************************************//
    private string RandNewsEvent()
    {
      return "Feedname     : " + feedName + "\nHeadline     : " + newsEvents[rand.Next(newsEvents.Length)] +
             newsLocations[rand.Next(newsLocations.Length)];
    }
  }
}

コード例を使用して、次の出力が生成されました。 新しいフィードはランダムであるため、ローカルニュースの見出しを表示するために複数回実行する必要がある可能性があります。

Waiting for 10 seconds before subscribing to local news headline feed.

** 2011 年 5 月 9 日午前 4 時 7 分 48 分にローカル ニュースヘッドライン サブスクリプションを作成する **

This subscription asks the ReplaySubject for the buffered headlines that
occurred within the last 10 seconds.

Press ENTER to exit.

**********************************[ ローカルニュースヘッドラインレポート ]********************************** 時間: 5/9/2011 4:07:42 AM -04:00 フィード名: ヘッドラインニュースフィード #2 ヘッドライン: 私たちはあなたの地域で宝くじの勝者を持っています。

**********************************[ ローカルニュースヘッドラインレポート ]**********************************時間 : 5/9/2011 4:07:47 AM -04:00 フィード名 : ヘッドラインニュースフィード #1 ヘッドライン: お客様の地域で発行された雪嵐の天気watch。

参照

リファレンス

ReplaySubject<T> クラス

System.Reactive.Subjects 名前空間