StreamInsight Query Patterns: Fun with process data – calculating the wind chill factor
Process data, commonly accessed from OPC or a process historian such as OSIsoft PI, is well suited for analysis using StreamInsight – being time-series data with data values from a range of sensors. In this post, I’ll walk through a couple of simple patterns for working with process data (not that these patterns are isolated to process data).
To follow along, the LINQ query for this example is here (if you haven’t used LINQPad with StreamInsight yet, follow the directions here to get up and running).
The general case for collecting process data is that of a range of sensors (temperature, pressure, humidity, flow, etc) connected to process control equipment (Programmable Logic Controllers, or PLC’s, SCADA systems, etc) that in turn relay the information upstream to process historians (or provide a direct data gathering interface, through OPC or a similar protocol).
The reason I mention this level of detail has to do with how process data typically arrives – in a single or mixed stream. That is to say as connections to these embedded devices tends to be ‘expensive’ we want to minimize connections and communication and pull multiple data values over a single connection, often emitted as a single CepStream<T> in StreamInsight (rather than having, for example, temperature and humidity information coming from two different data sources, in two different streams). This is why I have intermingled data types in my examples below, and why the query patterns usually start out with creating different virtual streams based on a data type.
For these examples, let’s consider a representative process control data type with some simple fields
- string Id
- DateTime Timestamp
- double Value
Next, assume that the Id field uses an “asset.sensor” naming convention (i.e. Station1.Temperature, and Station1.Windspeed). Also assume that the data updates are not guaranteed to be synchronized (i.e. the underlying PLC may not report updates to temperature and humidity at the same time, nor on the same schedule). For the first query, we want to create a query that answers the question:
“What is the current wind chill factor” (or is it a dry cold - the opposite side of the coin from the humidex)
Which breaks down to answering these questions:
- For each station, what is the latest value of both temperature and wind speed?
- Every time I have a new temperature or humidity value, I need to calculate a new wind chill factor.
Let’s start by examining the first question. Whenever we see a question involving latest value, we need to take a series of point or interval events and convert them into a signal (refer to Point to Signal conversion on MSDN).
This is a timeline representation of what we’re looking to do. We need to:
- Create separate streams for the temperature values and wind speed values, creating the inferred join key based off of the prefix of the Id.
- Convert each stream from a series of Point Events into signal stream using AlterEventLifetime and ClipEventDuration.
- For each joint event (i.e. each time we receive a new value from either sub-stream, calculate a new wind chil).
For the purposes of this example, assume that all temperature are in F, and the wind speed is in mph.
- /////////////////////////////////////////////////////////////////////////////
- // Create some sample values for both temperature and wind sensor
- // measurements
- /////////////////////////////////////////////////////////////////////////////
- var sensorValues = new[]
- {
- new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:00 AM"), Value = 20.1f },
- new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:30 AM"), Value = 18.1f },
- new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:45 AM"), Value = 17.1f },
- new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:01:00 AM"), Value = 22.5f },
- new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:05 AM"), Value = 30.1f },
- new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:25 AM"), Value = 40.4f },
- new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:35 AM"), Value = 50.3f },
- new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:01:55 AM"), Value = 40.6f },
- };
- /////////////////////////////////////////////////////////////////////////////
- // Convert the input set into a temporal stream by using the Timestamp
- // field (and ordering the incoming data)
- var inputStream = sensorValues.OrderBy(e => e.Timestamp)
- .ToPointStream(Application, e =>
- PointEvent.CreateInsert(e.Timestamp, e),
- AdvanceTimeSettings.StrictlyIncreasingStartTime);
- // Create a temperature stream, and assign the ID as the component of the
- // ID before the .Temperature
- var temperatureStream = from e in inputStream
- where e.Id.EndsWith(".Temperature")
- select new
- {
- Id = e.Id.Substring(0, e.Id.IndexOf(".")),
- Timestamp = e.Timestamp,
- Value = e.Value
- };
- // Create a windspeed stream, and assign the ID as the component of the
- // ID before the .Windspeed
- var windspeedStream = from e in inputStream
- where e.Id.EndsWith("Windspeed")
- select new
- {
- Id = e.Id.Substring(0, e.Id.IndexOf(".")),
- Timestamp = e.Timestamp,
- Value = e.Value
- };
- // TEMP: dump the raw streams
- //temperatureStream.Dump("temps");
- //windspeedStream.Dump("wind");
- // Convert the temperature and wind speed streams into signals (i.e. remember
- // last known value). Assume that we get at least one update every hour
- var temperatureSignal = temperatureStream
- .AlterEventDuration(e => TimeSpan.FromHours(1))
- .ClipEventDuration(temperatureStream, (e1, e2) => (e1.Id == e2.Id));
- var windspeedSignal = windspeedStream
- .AlterEventDuration(e => TimeSpan.FromHours(1))
- .ClipEventDuration(windspeedStream, (e1, e2) => (e1.Id == e2.Id));
- // TEMP: dump out the signal streams
- //var sinkTemp = from p in temperatureSignal.ToIntervalEnumerable()
- // where p.EventKind == EventKind.Insert
- // select new { p.Payload.Value, p.Payload.Id, p.StartTime, p.EndTime };
- //sinkTemp.Dump("temp dump");
- //
- //var windTemp = from p in windspeedSignal.ToIntervalEnumerable()
- // where p.EventKind == EventKind.Insert
- // select new { p.Payload.Value, p.Payload.Id, p.StartTime, p.EndTime };
- //windTemp.Dump("wind dump");
- // Join the two streams based on ID - the output will be an event containing
- // the last known values for both Temperature and Windspeed.
- var measurements = from e1 in temperatureSignal
- join e2 in windspeedSignal on e1.Id equals e2.Id
- select new
- {
- Id = e1.Id,
- Temperature = e1.Value,
- Windspeed = e2.Value
- };
- // TEMP - dump out the joint values
- var measures = from p in measurements.ToIntervalEnumerable()
- where p.EventKind == EventKind.Insert
- select new { p.Payload.Temperature, p.Payload.Windspeed, p.Payload.Id, p.StartTime, p.EndTime };
- measures.Dump("measures");
This LINQ snippet implements step one and two, giving us the joined result of the last known values for each sub-stream:
Now that we have the correlated temperature and wind speed in a single event, we can go ahead and calculate the wind chill factor for each data update. The algorithm we’ll use, as defined by the National Weather Service is
with T being the e.Temperature value, and V being the wind speed factor. We add this to the projection as
- // Join the two streams based on ID - the output will be an event containing
- // the last known values for both Temperature and Windspeed.
- var measurements = from e1 in temperatureSignal
- join e2 in windspeedSignal on e1.Id equals e2.Id
- select new
- {
- Id = e1.Id,
- Temperature = e1.Value,
- Windspeed = e2.Value,
- Windchill = Math.Round( 35.74
- + (0.6215 * e1.Value)
- - (35.75 * Math.Pow(e2.Value, 0.16))
- + (0.4275 * e1.Value * Math.Pow(e2.Value, 0.16)))
- };
And observe the calculated results.
This query pattern will work for any data source wherein you need to work off of and synchronize the “last known value” for a given set or subset of data. Note that if you are guaranteed to have synchronized timestamps on incoming data the point to signal conversion steps are not necessary before performing the join (as the timestamps of the incoming events already have the appropriate overlap).