StreamInsight: Reading from other data contexts into StreamInsight with LINQPad
The LINQPad driver for StreamInsight provides an awesomely easy way to run StreamInsight queries using easily accessible data contexts of temporal streams. However, as LINQPad only supports a single active data context connection in a query it can be difficult to use one of the hard to read “without a data context” sources (such as OData) and use StreamInsight.
As LINQPad does enable execution of generic .NET code, we can use a little trick to get around the single-connection context limitation simply placing the embedded server creation code (i.e. Server.Create()) directly into a LINQPad query statement (regardless of the active context).
Step 1: Reading an OData Service in LINQPad
We’ll start off by importing the Northwind data set as an OData service, then converting the Orders table into a CepStream. For more background on how to work with the new (as of 1.1) IEnumerable/IObservable adapters for StreamInsight refer to Colin Meek’s excellent StreamInsight Sequence Integration: Five Easy Pieces post.
- Start a copy of LINQPad with the StreamInsight driver installed. Refer to the StreamInsight LINQPad Driver post if you haven’t yet done this.
- From the data context list, click on Add Connection. From the Choose Data Context dialog, select WCF Data Services (OData) and click Next.
- Type the URI https://services.odata.org/Northwind/Northwind.svc into the LINQPad connection dialog, and click OK.
The Northwind OData context is now available for use in LINQPad. One question that might immediately crop up is why didn’t I use the StreamInsight database context instead of the OData context? LINQPad, unfortunately, doesn’t yet support multiple database contexts in a single query. With the need to pull data from one context (LINQ to OData) and process it in another (StreamInsight), and the LINQ to OData context being rather challenging to create on the fly, we’ll use the Northwind as our primary context, and dynamically generate a StreamInsight embedded server.
- The Northwind data context should now be available in the list of connections. From the query pane, select C# Statement(s) as the language, and the Northwind URI as the database. We use C# Statement instead of C# Expression to let us build up more complicated objects and code (such as the StreamInsight embedded server object).
Let’s run a simple query to make sure that the data feed is flowing through appropriately. Type the code in the block below into the query window and press F5 to execute the query.
- // Retrieve orders from the Northwind OData feed that
- // have valid shipping dates and order dates. Select
- // fields to be used in the StreamInsight query (basic
- // types only)
- var orders = from o in Orders
- where o.ShippedDate != null && o.OrderDate != null
- orderby o.OrderDate ascending
- select new
- {
- CustomerName = o.Customer.CompanyName,
- OrderDate = (DateTime)o.OrderDate,
- ShippedDate = (DateTime)o.ShippedDate,
- Region = o.ShipRegion
- };
- orders.Dump();
- The results should be similar to those shown in the screenshot below. Once you’ve validated that the OData feed is being consumed properly, delete the orders.Dump() call.
IOrderedQueryable<> (200 items) | |||
CustomerName | OrderDate | ShippedDate | Region |
Vins et alcools Chevalier | 7/4/1996 0:00 | 7/16/1996 0:00 | null |
Toms Spezialitäten | 7/5/1996 0:00 | 7/10/1996 0:00 | null |
Hanari Carnes | 7/8/1996 0:00 | 7/12/1996 0:00 | RJ |
Victuailles en stock | 7/8/1996 0:00 | 7/15/1996 0:00 | null |
Suprêmes délices | 7/9/1996 0:00 | 7/11/1996 0:00 | null |
Hanari Carnes | 7/10/1996 0:00 | 7/16/1996 0:00 | RJ |
Chop-suey Chinese | 7/11/1996 0:00 | 7/23/1996 0:00 | null |
Richter Supermarkt | 7/12/1996 0:00 | 7/15/1996 0:00 | null |
Wellington Importadora | 7/15/1996 0:00 | 7/17/1996 0:00 | SP |
HILARION-Abastos | 7/16/1996 0:00 | 7/22/1996 0:00 | Táchira |
Step 2: Creating a StreamInsight host and a basic query
Now that we have the OData feed available in LINQPad, let’s go ahead and set things up to allow us to create and execute StreamInsight queries without a data context (a data context or data connection makes this a lot easier – but since we also need the Northwind OData context, one of these has to be manually defined).
- In LINQPad, click on the Query menu item, then on Query Properties (or press F4).
- In the Query Properties dialog, add the following references to the Additional References list, as per the screenshot below.
- Microsoft.ComplexProcessing.dll
- Microsoft.ComplexProcessing.Observable.dll
- Click on the Additional Namespace Imports tab, and add the following namespaces as per the screenshot below.
- Microsoft.ComplexEventProcessing
- Microsoft.ComplexEventProcessing.Linq
Since this won’t be the only time (I hope ) that we use this technique, go ahead and click on the Set as default for new queries button, click OK to acknowledge the popup, then click OK to close the Query Properties dialog.
Now we can go ahead and define an embedded StreamInsight server to process our queries, by pasting the code below into the LINQPad query window.
- using (Server server = Server.Create("Default"))
- {
- Application app = server.CreateApplication("test");
- }
Using the techniques described in Colin’s excellent post on using Sequences with StreamInsight v1.1, let’s convert the set of orders into a temporal stream. The code below demonstrates how to obtain a CepStream<> of intervals from the enumerable set of orders.
- // Convert the set of orders into an interval stream
- var orderStream = orders.ToIntervalStream(app, t =>
- IntervalEvent.CreateInsert((DateTimeOffset)t.OrderDate, (DateTimeOffset)t.ShippedDate, t),
- AdvanceTimeSettings.IncreasingStartTime);
Now we’re ready to write our basic query. Let’s look for the condition wherein we have more than one outstanding order in a region (i.e. if the order date and ship dates overlap).
- // Group the order stream by region, and create
- // snapshot windows to look at the sets of overlapping
- // orders. Count the number of orders.
- var regionCounts = from o in orderStream
- group o by o.Region into regionGroups
- from win in regionGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
- select new { ShipRegion = regionGroups.Key, Count = win.Count() };
- // Look for regions which have more than 1 overlapping
- // order
- var filter = from e in regionCounts
- where e.Count > 2
- select e;
- // Dump the results of the query out into an enumerable
- var snk = from i in filter.ToIntervalEnumerable()
- where i.EventKind == EventKind.Insert
- select new { i.StartTime, i.EndTime, i.Payload.ShipRegion, i.Payload.Count };
- snk.Dump();
Let’s run the query (by pressing F5) and observe the results.
IEnumerable<> (130 items) | |||
StartTime | EndTime | ShipRegion | Count |
7/8/1996 7:00:00 AM +00:00 | 7/9/1996 7:00:00 AM +00:00 | null | 3 |
7/9/1996 7:00:00 AM +00:00 | 7/10/1996 7:00:00 AM +00:00 | null | 4 |
7/10/1996 7:00:00 AM +00:00 | 7/11/1996 7:00:00 AM +00:00 | null | 3 |
7/11/1996 7:00:00 AM +00:00 | 7/12/1996 7:00:00 AM +00:00 | null | 3 |
7/12/1996 7:00:00 AM +00:00 | 7/15/1996 7:00:00 AM +00:00 | null | 4 |
Interesting result combining LINQPad exploration of a LINQ to WCF OData and StreamInsight – but what went into producing that result? Which collection of events resulted in that output event? Answering that question will be my next post on using the Event Flow Debugger with LINQPad and StreamInsight
- // Retrieve orders from the Northwind OData feed that
- // have valid shipping dates and order dates. Select
- // fields to be used in the StreamInsight query (basic
- // types only)
- var orders = from o in Orders
- where o.ShippedDate != null && o.OrderDate != null
- orderby o.OrderDate ascending
- select new
- {
- CustomerName = o.Customer.CompanyName,
- OrderDate = (DateTime)o.OrderDate,
- ShippedDate = (DateTime)o.ShippedDate,
- Region = o.ShipRegion
- };
- using (Server server = Server.Create("Default"))
- {
- Application app = server.CreateApplication("test");
- // Convert the set of orders into an interval stream
- var orderStream = orders.ToIntervalStream(app, t =>
- IntervalEvent.CreateInsert((DateTimeOffset)t.OrderDate, (DateTimeOffset)t.ShippedDate, t),
- AdvanceTimeSettings.IncreasingStartTime);
- // Group the order stream by region, and create
- // snapshot windows to look at the sets of overlapping
- // orders. Count the number of orders.
- var regionCounts = from o in orderStream
- group o by o.Region into regionGroups
- from win in regionGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
- select new { ShipRegion = regionGroups.Key, Count = win.Count() };
- // Look for regions which have more than 1 overlapping
- // order
- var filter = from e in regionCounts
- where e.Count > 2
- select e;
- // Dump the results of the query out into an enumerable
- var snk = from i in filter.ToIntervalEnumerable()
- where i.EventKind == EventKind.Insert
- select new { i.StartTime, i.EndTime, i.Payload.ShipRegion, i.Payload.Count };
- snk.Dump();
- }