A lazy evaluation of F# Seq.groupBy for sorted sequences
In doing some recent work with Hadoop I needed to process a sequence which was grouped by a projected key. Whereas the Seq.groupBy can perform this operation, the Seq.groupBy function makes no assumption on the ordering of the original sequence. As a consequence the resulting sequence is not lazily evaluated, and is thus not suitable for the possible large sequences one needs to process when running Hadoop MapReduce jobs. Thus I had to create a lazily evaluated sequence based on a projected key.
To achieve this the processing has to be state dependant to handle the transition from one key value to the next. The input data is processed in such a fashion that any change in key value causes a transition to the next sequence. The state persistence is needed to track the key change, and to ensure the first yield of the new sequence is the value that caused this transition; transition values are not lost.
In writing the new Seq grouping extensions I wanted to provide the following prototypes:
groupByOrdered : ('T -> 'Key) –> seq<'T> –> seq<'Key * seq<'T>>
groupByOrderedWith : ('T –> ('Key * ‘Value)) –> seq<'T> –> seq<'Key * seq<'Value>>
The groupByOrdered function provides the same prototype as the Seq.groupBy function. The groupByOrderedWith function provides a means for modifying the type of the returned sequence values. This extension was useful in the Hadoop MapReduce processing.
A full implementation of the code is as follows:
module Seq =
// Returns the type seq<'Key * seq<'Value>>
let groupByOrderedWith (projection:('T -> ('Key * 'Value))) (source:seq<'T>) =
let lastInput = ref None
let continueDo = ref false
let comparer = ComparisonIdentity.Structural<'Key>
let enumerator = source.GetEnumerator()
let getInput() =
let found = enumerator.MoveNext()
if found then
let value = enumerator.Current
Some(projection value)
else
None
let inputsByKey (key:'Key) (firstValue:'Value) = seq {
// Yield any value from previous read
yield firstValue
continueDo := true
while !continueDo do
match getInput() with
| Some(input) when comparer.Compare(fst input, key) = 0 ->
// Yield found value and remainder of sequence
yield (snd input)
| Some(input) ->
// Have a value but different key
lastInput := Some(input)
continueDo := false
| None ->
// Have no more entries
lastInput := None
continueDo := false
}
let rec processInput (input:('Key * 'Value) option) = seq {
if input.IsSome then
let key = fst input.Value
let value = snd input.Value
yield (key, inputsByKey key value)
yield! processInput (lastInput.contents)
}
processInput (getInput())
// Returns the type seq<'Key * seq<'T>>
let groupByOrdered (projection:('T -> 'Key)) (source:seq<'T>) =
groupByOrderedWith (fun value -> (projection value, value)) source
So why write this code? If one does have to perform a Seq.groupBy of a large ordered sequence one can now group a tuple, ready for processing, with the following code:
let groupedSequence = Seq.groupByOrderedWith (fun value -> (fst value, snd value)) mySequence
Hopefully you will agree this code is simple to use. However a word of warning is warranted. These extensions should only be utilized if one knows the input data is sorted.
A final word on the use of the while statement. Whereas the inner sequence could be generated with a recursive function there is an issue with me using the match..when statement. The use of this statement means the function would not be guaranteed to be tail recursive, hence the rational in using a while loop instead.
Enjoy and Happy New Year.