MapReduce Based Co-occurrence Approach to an Item Based Recommender
In a previous post I covered the basics for a Co-occurrence Approach to an Item Based Recommender. As promised, here is the continuation of this work, an implementation of the same algorithm using MapReduce. Before reading this post it will be worth reading the Local version as it covers the sample data and general co-occurrence concepts. Also, the MapReduce example will use the same data as the Local based approach and generate the same recommendations.
As always the complete Local and MapReduce code can be downloaded from:
https://code.msdn.microsoft.com/Co-occurrence-Approach-to-57027db7
As a recap, the approach taken for the item-based recommender will be to define a co-occurrence Matrix based on purchased items; products purchased for an order. The MapReduce variant, rather than creating a Matrix, will create a series of Sparse Vectors, so once again I will be using the Math.Net Numerics libraries. The actual Mapper and Reducer types will be written in F# and the job submitted using the “Generics based Framework for Composing and Submitting .Net Hadoop MapReduce Jobs”.
The MapReduce Approach
The approach one will have to take with MapReduce is a little different to the Local implementation. The objective of the MapReduce phases will be to construct a series of Sparse Vectors, where each Sparse Vector represents the co-occurrence recommendation values for a single product. One can think of this as the rows of a Sparse Matrix but constructed independently and possibly output across several files.
To perform the calculation one will have to run two consecutive jobs. The first MapReduce job will take in the order detail lines and for each order output the list of products, with an associated co-occurrence quantity. It is from this data that one can construct the co-occurrence product pairs and hence the necessary vector values.
The second MapReduce job will use as input the output from the previous job. The Map phase will take each order and corresponding product lists, emitting the co-occurrence product and quantity pairs. The Reduce phase then constructs the Sparse Vectors for each product.
To cover these processes the following F# Record definitions will be required:
type OrderDetail = { OrderId:int; OrderDate:DateTime; ProductId:int; OrderQty:int}
type ProductQuantity = { ProductId:int; Quantity:float}
type ProductQuantityList() =
inherit List<ProductQuantity>()
type ProductRecommendations = {ProductId:int; Offset:int; Recommendations:SparseVector}
Using these type definitions the MapReduce job phases can be outlined as follows:
Stage | In Key | In Type | Out Key | Out Type |
Map 1 | OrderDetail | Order Id | ProductQuantity | |
Reduce 1 | Order Id | ProductQuantity | Order Id | ProductQuantityList |
Map 2 | (Order Id) | ProductQuantityList | Product Id | ProductQuantity |
Reduce 2 | Product Id | ProductQuantity | Product Id | ProductRecommendations |
However, one has to remember that the actual input/output types are actually sequences (or IEnumerables in C#), of the specified types. Also, the key into the second mapper is actually derived from the input data as the first tab separated field, the data being the output from the previous MapReduce job.
This is the basic version of the type mappings, but for both Map stages there are optimizations one can take. Also, for the second MapReduce job a Combiner can be used that may help performance.
Note: The ProductQuantityList types exists solely to support Json Serialization, which is now used as the serialization format for all data in and out of the MapReduce jobs.
Order MapReduce Phase
The first, order processing, MapReduce job will take in the order detail lines and output, for each order, the list of products with an associated co-occurrence quantity. The main purpose of the Map phase will be to strip down the data into an order identifier and a ProductQuantity. In this case the quantity being emitted is the value adjusted for recent orders; as in the Local case. The Reduce phase just outputs the input sequence.
The core sequence code for this base Mapper would be:
seq {
let orderLine = Helpers.ParseInputData value
if orderLine.IsSome then
let order = orderLine.Value
let product = {
ProductQuantity.ProductId = order.ProductId;
Quantity = (min (float order.OrderQty) qtyMaximum) * (orderFactor order)
}
yield (getOrderKey order.OrderId, product)
}
However, if one takes this approach one will not be taking advantage of the fact that the input data is sorted on the order identifier. If one assumes this, an optimization that one can easily take is rather than just emit a single ProductQuantity, emit the list of all the products for the each order. This would reduce the volume of output data and hence the work for the Shuffle and Sort phase. Of course for the case where the data is not sorted, or split across mappers, the Reducer will do the final aggregation of the data.
In this optimized version the types mapping becomes:
Stage | In Key | In Type | Out Key | Out Type |
Map | OrderDetail | Order Id | ProductQuantityList | |
Reduce | Order Id | ProductQuantityList | Order Id | ProductQuantityList |
This leads to a full Mapper code listing of the following:
Order Mapper
- type OrderVectorMapper() =
- inherit MapperBaseText<ProductQuantityList>()
- // Configuration values
- let qtyMaximum = 5.0 // Maximum rating contribution for an item
- let recentFactor = 2.0 // Quantity increase factor for recent items
- let baseDate = DateTime.Today.AddMonths(-3) // Date for a recent item
- let products = ProductQuantityList()
- let mutable currentOrder = None
- // Converts an order Id to a string key
- let getOrderKey (orderId:int) =
- sprintf "%i" orderId
- /// Map the data from input name/value to output name/value
- override self.Map (value:string) =
- // Adds a quantity factor based on recent orders
- let inline orderFactor (order:OrderDetail) =
- if DateTime.Compare(order.OrderDate, baseDate) > 0 then
- recentFactor
- else
- 1.0
- // Process the order
- let orderLine =
- try
- Some(Helpers.ParseInputData value)
- with
- | :? System.ArgumentException -> None
- seq {
- if orderLine.IsSome then
- let order = orderLine.Value
- let product = {
- ProductQuantity.ProductId = order.ProductId;
- Quantity = (min (float order.OrderQty) qtyMaximum) * (orderFactor order)}
- if currentOrder.IsSome && not (order.OrderId = currentOrder.Value) then
- yield (getOrderKey currentOrder.Value, products)
- products.Clear()
- currentOrder <- Some(order.OrderId)
- products.Add product
- else
- Context.IncrementCounter("ORDERS", "Skipped Lines")
- }
- /// Output remaining Map items
- override self.Cleanup() = seq {
- if currentOrder.IsSome then
- yield (getOrderKey currentOrder.Value, products)
- }
To perform this optimization a ProductQuantityList is maintained and the corresponding sequence is emitted whenever the order identifier changes. The final Cleanup() step flushes any remaining values.
The calculation of the co-occurrence quantity is the same as in the Local case. The order quantity is used, capped at a maximum value. The quantity is also adjusted based on the order date. During the next MapReduce phase the final quantity is taken as the maximum of the quantity for each product.
As previously noted, in this case, the Reducer just re-emits the aggregated input data:
Order Reducer
- type OrderVectorReducer() =
- inherit ReducerBase<ProductQuantityList, ProductQuantityList>()
- /// Reduce the order data into a product list
- override self.Reduce (key:string) (values:seq<ProductQuantityList>) =
- let products = ProductQuantityList()
- values
- |> Seq.iter (Seq.iter products.Add)
- Seq.singleton (key, products)
To submit this job one would use the following command:
%HOMEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
-input "recommendations/inputdata" -output "recommendations/workingdata"
-mapper "MSDN.Recommender.MapReduce.OrderVectorMapper, MSDN.Recommender.MapReduce"
-reducer "MSDN.Recommender.MapReduce.OrderVectorReducer, MSDN.Recommender.MapReduce"
-file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.MapReduce.dll"
-file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.dll"
The data from this MapReduce job is then fed into the next phase.
Product MapReduce Phase
The second, product processing, MapReduce job constructs the Sparse Vectors for each product identifier. The Map phase will take each order, and corresponding products and quantities, and emit the pairs of products along with the final co-occurrence quantity; being the maximum of the two possible values. The Reduce phase will sum all the co-occurrence values for each product, and construct the final product Sparse Vectors.
In this instance a Combine operation can be advantageous to performance. This will result in types mapping of:
Stage | In Key | In Type | Out Key | Out Type |
Map | (Order Id) | ProductQuantityList | Product Id | ProductQuantity |
Combine | Product Id | ProductQuantity | Product Id | ProductQuantity |
Reduce | Product Id | ProductQuantity | Product Id | ProductRecommendations |
If one takes this simple approach the base Map code would be along the lines of the following:
seq {
for (product1, product2) in ((deserialize value) |> pairs) do
let qty = max product1.Quantity product2.Quantity
yield getProductKey product1.ProductId, {ProductQuantity.ProductId = product2.ProductId; Quantity = qty}
yield getProductKey product2.ProductId, {ProductQuantity.ProductId = product1.ProductId; Quantity = qty}
}
However, as in the first MapReduce job, there is an optimization one can take. The basic approach is to just parse the input data from the previous job and emit each product pair without aggregating any of the data. However, there is the option for aggregating data within a Mapper; thus reducing the data that needs to be parsed to the reducer and the overhead of the Shuffle and Sort phase.
To perform this optimization a Dictionary can be used, with the Key being a Tuple of the product pair, and the Value being the calculated co-occurrence quantity. Obviously one cannot indefinitely build up this Dictionary, so the values are emitted once a predefined Dictionary size is reached. At this point the in-Mapper aggregation reinitializes the Dictionary and starts again. Once again the final Cleanup() step flushes any remaining Dictionary values.
This leads to a full Mapper code listing of:
Product Mapper
- type ProductVectorMapper() =
- inherit MapperBaseText<ProductQuantity>()
- let maxSize = 1024*1024
- let prodPairs = Dictionary<int*int, float>(maxSize)
- // Converts an order Id to a string key
- let getProductKey (productId:int) =
- sprintf "%i" productId
- // Adds to the table
- let addRow idx qty =
- if prodPairs.ContainsKey idx then
- prodPairs.[idx] <- prodPairs.[idx] + qty
- else
- prodPairs.[idx] <- qty
- ()
- // Defines a sequence of the current pairs
- let currents = seq {
- for item in prodPairs do
- let (product1, product2) = item.Key
- yield getProductKey product1, {ProductQuantity.ProductId = product2; Quantity = item.Value}
- prodPairs.Clear()
- }
- /// Map the data from input name/value to output name/value
- override self.Map (value:string) =
- // Parses an input line of format List<ProductQuantity>
- let deserialize (input:string) =
- let keyValue = input.Split('\t')
- Helpers.ParseProductQuantityList (keyValue.[1].Trim())
- // calculates the pairs for an order
- let rec pairs (items:List<'a>) = seq {
- let count = items.Count
- match count with
- | 0 | 1 -> ()
- | 2 ->
- yield items.[0], items.[1]
- | _ ->
- for idxOut = 0 to (count - 2) do
- for idxIn = (idxOut + 1) to (count - 1) do
- yield (items.[idxOut], items.[idxIn])
- }
- // Define the sequence to return the product/product pairs information
- (deserialize value)
- |> pairs
- |> Seq.iter (fun (product1, product2) ->
- let qty = max product1.Quantity product2.Quantity
- addRow (product1.ProductId, product2.ProductId) qty
- addRow (product2.ProductId, product1.ProductId) qty)
- if prodPairs.Count > maxSize then
- currents
- else
- Seq.empty
- /// Output remaining Map items
- override self.Cleanup() =
- currents
A secondary optimization one can make for each Mapper is that of running a Combiner. If one has a large Dictionary object, then the need for a Combiner is diminished. However the code for such a Combiner just performs further quantity aggregations against the output for each Mapper:
Product Combiner
- type ProductVectorCombiner() =
- inherit CombinerBase<ProductQuantity>()
- /// Combine the data from input name/value to output name/value
- override self.Combine (key:string) (values:seq<ProductQuantity>) =
- let maxSize = 100000 // Expected number of product correlations
- let products = Dictionary<int, float>(maxSize)
- // Adds to the table
- let addRow idx qty =
- if products.ContainsKey idx then
- products.[idx] <- products.[idx] + qty
- else
- products.[idx] <- qty
- ()
- // Process the reducer input
- values
- |> Seq.iter (fun product -> addRow product.ProductId product.Quantity)
- seq {
- for item in products do
- yield key, {ProductQuantity.ProductId = item.Key; Quantity = item.Value}
- }
Once all the product pairs have been emitted by the Mapper, the Reducer can build a Sparse Vector for each product. This is done by aggregating all co-occurrence values as the element values of the Sparse Vector; much in the same way that the Sparse Matrix is constructed:
Product Reducer
- type ProductVectorReducer() =
- inherit ReducerBase<ProductQuantity, ProductRecommendations>()
- /// Reduce the data from input name/value to output name/value
- override self.Reduce (key:string) (values:seq<ProductQuantity>) =
- // Configuration values
- let entryThreshold = 20.0 // Minimum correlations for matrix inclusion
- let matrixSize = 5000 // Expected Correlations for Hash Table init
- let minItem = ref Int32.MaxValue
- let maxItem = ref 0
- let rcTable = Dictionary<int, float>(matrixSize)
- // Adds to the table
- let addRow idx qty =
- minItem := min idx !minItem
- maxItem := max idx !maxItem
- if rcTable.ContainsKey idx then
- rcTable.[idx] <- rcTable.[idx] + qty
- else
- rcTable.[idx] <- qty
- ()
- // Process the reducer input
- values
- |> Seq.iter (fun product -> addRow product.ProductId product.Quantity)
- let offset = !minItem
- let size = !maxItem + 1 - !minItem
- let items = seq {
- for item in rcTable do
- if item.Value > entryThreshold then
- yield (item.Key - offset, item.Value)
- }
- let recommendations = {ProductRecommendations.ProductId = Int32.Parse(key); Offset = offset; Recommendations = SparseVector.ofSeq size items}
- Context.IncrementCounter("PRODUCTS", "Recommendations Written")
- Seq.singleton (key, recommendations)
To submit this job one would use the following command:
%HOMEPATH%\MSDN.Hadoop.MapReduce\Release\MSDN.Hadoop.Submission.Console.exe
-input "recommendations/workingdata/part-0000[012356789]" -output "recommendations/outputdata"
-mapper "MSDN.Recommender.MapReduce.ProductVectorMapper, MSDN.Recommender.MapReduce"
-reducer "MSDN.Recommender.MapReduce.ProductVectorReducer, MSDN.Recommender.MapReduce"
-file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.MapReduce.dll"
-file "%HOMEPATH%\MSDN.Recommender\Release\MSDN.Recommender.dll"
-file "%HOMEPATH%\MSDN.Recommender\Release\MathNet.Numerics.dll"
-file "%HOMEPATH%\MSDN.Recommender\Release\MathNet.Numerics.FSharp.dll"
If you review the previous code you will see that for each product, the Sparse Vector recommendations are accompanied with an Offset. This is the same offset as used in the Local version of the code, and represents the offset for the first product identifier. This is purely an optimization for querying the data.
The output from this job can then be saved and loaded into a Query engine to produce product recommendations.
Product Recommendations
Once the co-occurrence Sparse Vectors have been constructed they can be loaded into memory and queried in a very similar fashion to the Local case. To perform the loading, a Dictionary of objects is constructed where the Key is the product identifier and the value the ProductRecommendation type, containing the co-occurrence Sparse Vector:
Vector Builder
- module VectorLoader =
- // Defines a sequence of product vectors
- let private vectorFile (mappedfile:string) = seq {
- use reader = new StreamReader(Path.GetFullPath(mappedfile))
- while not reader.EndOfStream do
- let line = reader.ReadLine()
- let keyValue = line.Split('\t')
- let (_, value) = (Int32.Parse(keyValue.[0].Trim()), keyValue.[1].Trim())
- yield (Helpers.ParseVectorData value)
- reader.Close()
- }
- /// Loads a collection Product Vector file
- let GetProductVectors (filenames:string array) =
- let products = ConcurrentDictionary<int, ProductRecommendations>()
- filenames
- |> Array.iter (fun filename ->
- vectorFile filename
- |> Seq.iter (fun product -> products.TryAdd(product.ProductId, product) |> ignore))
- products
- /// Loads a single Product Vector file
- let GetProductVector (filename:string) =
- let products = ConcurrentDictionary<int, ProductRecommendations>()
- (vectorFile filename)
- |> Seq.iter (fun product -> products.TryAdd(product.ProductId, product) |> ignore)
- products
Once the Vectors have been loaded they can be queried in the same way as for the Local version. Basically, the Sparse Vector values for the products for which a recommendation is required are loaded into a PriorityQueue. The top X items are then de-queued and returned as the recommendations:
Vector Query
- type VectorQuery (filenames:string array) =
- let defaultRecommendations = 12
- let coVectors =
- match filenames with
- | [||] -> invalidArg "filename" "Filename cannot be an empty Array"
- | [| filename |] -> VectorLoader.GetProductVector(filename)
- | _ -> VectorLoader.GetProductVectors(filenames)
- let getQueueSV (products:int array) =
- // Define the priority queue and lookup table
- let queue = PriorityQueue(coVectors.Count)
- let lookup = HashSet(products)
- // Add the items into a priority queue
- products
- |> Array.iter (fun item ->
- if item >= 0 && coVectors.ContainsKey(item) then
- let product = coVectors.[item]
- let recommendations = product.Recommendations
- seq {
- for idx = 0 to (recommendations.Count - 1) do
- let productIdx = idx + product.Offset
- if (not (lookup.Contains(productIdx))) && (recommendations.[idx] > 0.0) then
- yield KeyValuePair(recommendations.[idx], productIdx)
- }
- |> queue.Merge)
- // Return the queue
- queue
- let getItems (queue:PriorityQueue<float, int>) (items:int) =
- let toDequeue = min items queue.Count
- seq { for i in 1 .. toDequeue do yield queue.Dequeue().Value }
- new(filename:string) =
- VectorQuery([| filename |])
- /// Get the requested number of Recommendations for a Product
- member self.GetRecommendations(product:int, items:int) =
- let queue = getQueueSV([| product |])
- getItems queue items
- /// Get the requested number of Recommendations for a Product Array
- member self.GetRecommendations(products:int array, items:int) =
- let queue = getQueueSV(products)
- getItems queue items
- /// Get the default number of Recommendations for a Product
- member self.GetRecommendations(product:int) =
- self.GetRecommendations(product, defaultRecommendations)
- /// Get the default number of Recommendations for a Product Array
- member self.GetRecommendations(products:int array) =
- self.GetRecommendations(products, defaultRecommendations)
Usage of the VectorQuery type is simply a matter of specifying which files to load and then calling GetRecommendations() function, in exactly the same way as for the Local case.
let itemQuery = VectorQuery(vectorFiles)
let recommendations = itemQuery.GetRecommendations(850, 100)
The API also supports getting recommendations for a product array; namely a shopping basket.
Conclusion
One thing that is worth pointing out to conclude these blog posts, is that I have skipped over how to manage the caching of the Sparse Matrix and Vector values. The reality is one would need to only build/load the data once and cache the results. This cached value would then be used by the recommendations API rather than the filenames. The cache would also need to allow multiple threads to access the loaded data, and also manage refreshing of the data.
As a final reminder, to install the libraries from NuGet one can use the Manage NuGet Packages browser, or run these commands in the Package Manager Console:
Install-Package MathNet.Numerics
Install-Package MathNet.Numerics.FSharp
To conclude these posts, during the coming week, I will also implement the C# version of the code.