Fixing a gap in a prior post – Finding the Top Category with a Tumbling Window

In a prior blog post – https://blogs.msdn.com/b/appfabriccat/archive/2010/11/03/streaminsight-query-pattern-find-the-top-category-using-order-by-take-and-applywithunion.aspx– I discussed how to determine the top category in a set of events using order by and take.  However, I only covered non-overlapping aggregate windows (i.e. Tumbling windows).  I did not cover how you have to adjust your query definition for overlapping (Hopping) windows.  Since this question came up in the forums, and it’s somewhat challenging to post a picture therein, here’s how you fix the problem.

Let’s start with our “working” top category query.

  1. ////////////////////////////////////////////////////////////////////////////////////////////
  2. // How can I find the group of events with the highest *average* value in the last 8 hours?
  3. ////////////////////////////////////////////////////////////////////////////////////////////        
  4. var weatherData = new[]
  5. {
  6.     new { Timestamp = DateTime.Parse("1/1/2010 0:00:00"), Temperature = -9.0,    StationCode = 71395,    WindSpeed = 4    },
  7.     new { Timestamp = DateTime.Parse("1/1/2010 0:30:00"), Temperature = -4.5,    StationCode = 71395,    WindSpeed = 41    },
  8.     new { Timestamp = DateTime.Parse("1/1/2010 1:00:00"), Temperature = -8.8,    StationCode = 71395,    WindSpeed =    6    },
  9.     new { Timestamp = DateTime.Parse("1/1/2010 1:30:00"), Temperature = -4.4,    StationCode = 71801,    WindSpeed =    39    },
  10.     new { Timestamp = DateTime.Parse("1/1/2010 2:00:00"), Temperature = -9.7,    StationCode = 71395,    WindSpeed =    9    },
  11.     new { Timestamp = DateTime.Parse("1/1/2010 2:30:00"), Temperature = -4.6,    StationCode = 71801,    WindSpeed =    59    },
  12.     new { Timestamp = DateTime.Parse("1/1/2010 3:00:00"), Temperature = -9.6,    StationCode = 71801,    WindSpeed =    9    }
  13. };
  14.  
  15. var weather = weatherData.ToPointStream(Application, t =>
  16.     PointEvent.CreateInsert(t.Timestamp, t),
  17.     AdvanceTimeSettings.IncreasingStartTime);
  18.     
  19.         
  20. // How do I find the category with the most events in the last 4 hours?
  21. var eventCount = from e in weather
  22.                  group e by e.StationCode into stationGroups
  23.                  from win in stationGroups.TumblingWindow(
  24.                      TimeSpan.FromHours(2), HoppingWindowOutputPolicy.ClipToWindowEnd)
  25.                  select new
  26.                  {
  27.                      StationCode = stationGroups.Key,
  28.                      Count = win.Count()
  29.                  };
  30.  
  31. var topCategory = (from win in eventCount.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  32.                   from e1 in win
  33.                    orderby e1.Count descending, e1.StationCode ascending
  34.                    select e1).Take(1);
  35. \partopCategory.Dump("top category");        
  36.  
  37. //

What is actually happening here in terms of the event flow is described in the diagram below.  The input stream, weather, is broken up into substreams by the station code, which then have Tumbling windows applied over top of them.  This calculates the number of events in each category, which is then passed to the snapshot window (and then the order by and take).

image

This works so long as the input events to the snapshot window do not overlap.  If they do, older “calculations” can be counted against the new ranking.   For example, if a Hopping window is used, such as this query:

  1. ////////////////////////////////////////////////////////////////////////////////////////////
  2. // How can I find the group of events with the highest *average* value in the last 8 hours?
  3. ////////////////////////////////////////////////////////////////////////////////////////////        
  4. var weatherData = new[]
  5. {
  6.     new { Timestamp = DateTime.Parse("1/1/2010 0:00:00"), Temperature = -9.0,    StationCode = 71395,    WindSpeed = 4    },
  7.     new { Timestamp = DateTime.Parse("1/1/2010 0:30:00"), Temperature = -4.5,    StationCode = 71395,    WindSpeed = 41    },
  8.     new { Timestamp = DateTime.Parse("1/1/2010 1:00:00"), Temperature = -8.8,    StationCode = 71395,    WindSpeed =    6    },
  9.     new { Timestamp = DateTime.Parse("1/1/2010 1:30:00"), Temperature = -4.4,    StationCode = 71801,    WindSpeed =    39    },
  10.     new { Timestamp = DateTime.Parse("1/1/2010 2:00:00"), Temperature = -9.7,    StationCode = 71395,    WindSpeed =    9    },
  11.     new { Timestamp = DateTime.Parse("1/1/2010 2:30:00"), Temperature = -4.6,    StationCode = 71801,    WindSpeed =    59    },
  12.     new { Timestamp = DateTime.Parse("1/1/2010 3:00:00"), Temperature = -9.6,    StationCode = 71801,    WindSpeed =    9    }
  13. };
  14.  
  15. var weather = weatherData.ToPointStream(Application, t =>
  16.     PointEvent.CreateInsert(t.Timestamp, t),
  17.     AdvanceTimeSettings.IncreasingStartTime);
  18.     
  19.         
  20. // How do I find the category with the most events in the last 4 hours?
  21. var eventCount = from e in weather
  22.                  group e by e.StationCode into stationGroups
  23.                  from win in stationGroups.HoppingWindow(
  24.                      TimeSpan.FromHours(2), TimeSpan.FromHours(1),
  25.                     HoppingWindowOutputPolicy.ClipToWindowEnd)
  26.                  select new
  27.                  {
  28.                      StationCode = stationGroups.Key,
  29.                      Count = win.Count()
  30.                  };
  31.  
  32. var topCategory = (from win in eventCount.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  33.                   from e1 in win
  34.                    orderby e1.Count descending, e1.StationCode ascending
  35.                    select e1).Take(1);
  36. \partopCategory.Dump("top category");        
  37.  
  38. //

The following effect is observed – at points in the event stream the results of older Tumbling windows are passed to the snapshot window for ordering.  This can produce incorrect results, as seen in this diagram.

image

In order to prevent this from happening, we need to stop the output of the hopping windows from overlapping in the snapshot window. There are a number of ways to do this, but the easiest is to convert the output of the Hopping window back into point events via the ToPointEventStream method.

  1. ////////////////////////////////////////////////////////////////////////////////////////////
  2. // How can I find the group of events with the highest *average* value in the last 8 hours?
  3. ////////////////////////////////////////////////////////////////////////////////////////////        
  4. var weatherData = new[]
  5. {
  6.     new { Timestamp = DateTime.Parse("1/1/2010 0:00:00"), Temperature = -9.0,    StationCode = 71395,    WindSpeed = 4    },
  7.     new { Timestamp = DateTime.Parse("1/1/2010 0:30:00"), Temperature = -4.5,    StationCode = 71395,    WindSpeed = 41    },
  8.     new { Timestamp = DateTime.Parse("1/1/2010 1:00:00"), Temperature = -8.8,    StationCode = 71395,    WindSpeed =    6    },
  9.     new { Timestamp = DateTime.Parse("1/1/2010 1:30:00"), Temperature = -4.4,    StationCode = 71801,    WindSpeed =    39    },
  10.     new { Timestamp = DateTime.Parse("1/1/2010 2:00:00"), Temperature = -9.7,    StationCode = 71395,    WindSpeed =    9    },
  11.     new { Timestamp = DateTime.Parse("1/1/2010 2:30:00"), Temperature = -4.6,    StationCode = 71801,    WindSpeed =    59    },
  12.     new { Timestamp = DateTime.Parse("1/1/2010 3:00:00"), Temperature = -9.6,    StationCode = 71801,    WindSpeed =    9    }
  13. };
  14.  
  15. var weather = weatherData.ToPointStream(Application, t =>
  16.     PointEvent.CreateInsert(t.Timestamp, t),
  17.     AdvanceTimeSettings.IncreasingStartTime);
  18.     
  19.         
  20. // How do I find the category with the most events in the last 4 hours?
  21. var eventCount = from e in weather
  22.                  group e by e.StationCode into stationGroups
  23.                  from win in stationGroups.HoppingWindow(
  24.                      TimeSpan.FromHours(2), TimeSpan.FromHours(1),
  25.                     HoppingWindowOutputPolicy.ClipToWindowEnd)
  26.                  select new
  27.                  {
  28.                      StationCode = stationGroups.Key,
  29.                      Count = win.Count()
  30.                  };
  31.  
  32. var topCategory = (from win in eventCount.ToPointEventStream()
  33.                       .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  34.                   from e1 in win
  35.                    orderby e1.Count descending, e1.StationCode ascending
  36.                    select e1).Take(1);
  37. \partopCategory.Dump("top category");        
  38.  
  39. //

Which will produce the desired results:

image

Note that the diagrams don’t entirely line up; I’ll polish this up a bit and add some information on tie-breaking before posting to the AppFabric CAT blog site next week.