Creating a custom event filter

patterns & practices Developer Center

In some scenarios, you may want to interact with the stream of events arriving at one of the Semantic Logging Application Block events sinks. For example, you may want to initiate other processes or take specific actions when certain events arrive, before one of the sinks that you specify in your application stores the events.

The sink classes included with the block subscribe to event listeners that implement the IObservable interface. By creating custom code that subscribes to an IObservable instance, you can manipulate the events before they are seen by the sink. You then generates a new stream that an IObserver instance, such as one of the existing sinks in the block or a custom sink that you create, can subscribe to.

The example shown here illustrates how you can use Reactive Extensions (Rx) to build a custom filter that controls which events are sent to the sinks in the in-process scenarios. It shows how you can buffer some log messages until a specific event takes place.

In this example, no informational messages are sent to the sink until the listener receives an error message. When this happens, the listener sends the last ten buffered informational messages and the error message to the sink. This is useful if, for the majority of the time, you are not interested in the informational messages but, when an error occurs, you want to see the most recent informational messages that might have led to that error.

The following code sample shows the FlushOnTrigger extension method that implements this behavior. It uses a custom class called CircularBuffer (not shown here) to hold the most recent messages.

public static IObservable<T> FlushOnTrigger<T>(
  this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize)
{
  return Observable.Create<T>(observer =>
  {
    var buffer = new CircularBuffer<T>(bufferSize);
    var subscription = stream.Subscribe(newItem =>
      {
         if (shouldFlush(newItem))
         {
            foreach (var buffered in buffer.TakeAll())
            {
               observer.OnNext(buffered);
            }

            observer.OnNext(newItem);
         }
         else
         {
            buffer.Add(newItem);
         }
      },
      observer.OnError,
      observer.OnCompleted);

    return subscription;
  });
}

The following code sample shows how to use this method when you are initializing a sink. In this scenario, you must invoke the FlushOnTrigger extension method that creates the new stream of events for the sink to subscribe to.

var listener = new ObservableEventListener();
listener.EnableEvents(MyCompanyEventSource.Log, EventLevel.Informational, Keywords.All);

listener
  .FlushOnTrigger(entry => entry.Schema.Level <= EventLevel.Error, bufferSize: 10)
  .LogToConsole();

Note

The Semantic Logging Application Block does not have a dependency on Reactive Extensions (Rx), but this specific example does require you to add the Rx NuGet package to your project because it uses the Observable class and the utilities that the Rx library contains. For more information about using Rx see The Reactive Extensions (Rx) on MSDN.

Next Topic | Previous Topic | Home | Community