Co-occurrence Approach to an Item Based Recommender
For a while I thought I would tackle the problem of creating an item-based recommender. Firstly I will start with a local variant before moving onto a MapReduce version. The current version of the code can be found at:
https://code.msdn.microsoft.com/Co-occurrence-Approach-to-57027db7
The approach taken for the item-based recommender will be to define a co-occurrence matrix based on purchased items; products purchased on an order. As an example I will use the Sales data from the Adventure Works SQL Server sample database.. The algorithm/approach is best explained, and is implemented, using a matrix.
For the matrix implementation I will be using the Math.Net Numerics libraries. To install the libraries from NuGet one can use the Manage NuGet Packages browser, or run these commands in the Package Manager Console:
Install-Package MathNet.Numerics
Install-Package MathNet.Numerics.FSharp
Co-occurrence Sample
Lets start with a simple sample. Consider the following order summary:
Order ID | Product ID1 | Product ID2 | Product ID3 |
500001 | 1001 | 1003 | 1005 |
500002 | 1001 | 1002 | 1003 |
500003 | 1002 | 1003 | |
500004 | 1003 | 1004 | 1005 |
500005 | 1003 | 1005 | |
500006 | 1003 | 1004 | |
500007 | 1002 | 1003 | |
500008 | 1001 | 1003 | 1004 |
500009 | 1003 | 1004 | 1005 |
If you look at the orders containing products 1002, you will see that there is 1 occurrence of product 1001 and 3 occurrences of product 1003. From this, one can deduce that if someone purchases product 1002 then it is likely that they will want to purchase 1003, and to a lesser extent product 1001. This concept forms the crux of the item-based recommender.
So if we computed the co-occurrence for every pair of products and placed them into a square matrix we would get the following:
Product ID | 1001 | 1002 | 1003 | 1004 | 1005 |
1001 | 1 | 3 | 1 | 1 | |
1002 | 1 | 3 | |||
1003 | 3 | 3 | 4 | 4 | |
1004 | 1 | 4 | 2 | ||
1005 | 1 | 4 | 2 |
There are a few things we can say about such a matrix, other than it will be square. More than likely, for a large product set the matrix will be sparse, where the number of rows and columns is equal to the number of items. Also, each row (and column as the matrix is symmetric) expresses similarities between an item and all others.
Due to the diagonal symmetric nature of the matrix (axy = ayx) one can think of the rows and columns as vectors, where similarity between items X and Y is the same as the similarity between items Y and X.
The algorithm/approach I will be outlining will essentially be a means to build and query this co-occurrence matrix. Co-occurrence is like similarity, the more two items occur together (in this case in a shopping basket), the more they are probably related.
Working Data
Before talking about the code a word is warranted on how I have created the sample data.
As mentioned before I am using the Adventure Works database sales information. The approach I have taken is to export the sales detail lines, ordered by the sales order identifier, into flat files. The rationale for this being that the processing of the matrix can then occur with a low impact to the OLTP system. Also, the code will support the parallel processing of multiple files. Thus one can take the approach of just exporting more recent data and using the previously exported archived data to generate the matrix.
To support this process I have created a simple view for exporting the data:
CREATE VIEW [Sales].[vSalesSummary]
AS
SELECT SOH.[SalesOrderID], CAST(SOH.[OrderDate] AS date) [OrderDate], SOD.[ProductID], SOD.[OrderQty]
FROM [Sales].[SalesOrderHeader] SOH
INNER JOIN [Sales].[SalesOrderDetail] SOD ON SOH.[SalesOrderID] = SOD.[SalesOrderID]
GO
I have then used the BCP command to export the data into a Tab delimited file:
bcp
"SELECT [SalesOrderID], [OrderDate], [ProductID], [OrderQty] FROM [AdventureWorks2012].[Sales].[vSalesSummary] ORDER BY [SalesOrderID], [ProductID]"
queryout SalesOrders.dat
-T -S (local) -c
One could easily define a similar process where a file is generated for each month, with the latest month being updated on a nightly basis. One could then easily ensure only orders for the past X months are including in the metric. The only important aspect in how the data is generated is that the output must be ordered on the sales order identifier. As you will see later, this grouping is necessary to allow the co-occurrence data to be derived.
A sample output from the Adventure Works sales data is as follows, with the fields being tab delimited:
43659 2005-07-01 776 1
43659 2005-07-01 777 3
43659 2005-07-01 778 1
43660 2005-07-01 758 1
43660 2005-07-01 762 1
43661 2005-07-01 708 5
43661 2005-07-01 711 2
43661 2005-07-01 712 4
43661 2005-07-01 715 4
43661 2005-07-01 716 2
43661 2005-07-01 741 2
43661 2005-07-01 742 2
43661 2005-07-01 743 1
43661 2005-07-01 745 1
43662 2005-07-01 730 2
43662 2005-07-01 732 1
43662 2005-07-01 733 1
43662 2005-07-01 738 1
Once the data has been exported building the matrix becomes a matter of counting the co-occurrence of products associated with each sales order.
Building the Matrix
The problem of building a co-occurrence matrix is simply one of counting. The basic process is as follows:
- Read the file and group each sales order along with the corresponding list of product identifiers
- For each sales order product listing, calculate all the corresponding pairs of products
- Maintain/Update the running total of each pair of products found
- At the end of the file, output a sparse matrix based on the running totals of product pairs
To support parallelism steps 1-3 are run in parallel where the output of these steps will be a collection of the product pairs with the co-occurrence count. These collections are then combined to create a single sparse matrix.
In performing this processing it is important that the reading of the file data is such that it can efficiently create a sequence of products for each sales order identifier; the grouping key. If you have been following my Hadoop Streaming blog entries one will see that this is the same process undertaken in processing data within a reducer step.
The matrix building code, in full, is as follows:
- module MatrixBuilder =
- // Configuration values
- let qtyMaximum = 5.0 // Maximum rating contribution for an item
- let entryThreshold = 20.0 // Minimum correlations for matrix inclusion
- let recentFactor = 2.0 // Quantity increase factor for recent items
- let baseDate = DateTime.Today.AddMonths(-3) // Date for a recent item
- let matrixSize = 10000*500 // Products*Correlations for Hash Table init
- // Gets the base data for building the sparse matrix
- let private getMatrixData filename =
- let minItem = ref Int32.MaxValue
- let maxItem = ref 0
- let rcTable = Dictionary<(int*int), float>(matrixSize)
- // Define file reader properties
- use reader = new StreamReader(Path.GetFullPath(filename))
- let getInput() =
- if reader.EndOfStream then
- None
- else
- let input = reader.ReadLine()
- if not (String.IsNullOrEmpty(input)) then
- Some(Helpers.ParseInputData input)
- else
- None
- // calculates the pairs for an order
- let rec pairs items = seq {
- match items with
- | head::tail -> for item in tail do yield head, item
- yield! pairs tail
- | _ -> ()
- }
- // Add a factor based on order properties - example recent orders
- let orderFactor (order:OrderHeader) =
- if DateTime.Compare(order.OrderDate, baseDate) > 0 then
- recentFactor
- else
- 1.0
- // Adds to the table
- let addRowColumn idx1 idx2 qty =
- minItem := min (min idx1 idx2) !minItem
- maxItem := max (max idx1 idx2) !maxItem
- if rcTable.ContainsKey (idx1, idx2) then
- rcTable.[(idx1, idx2)] <- rcTable.[(idx1, idx2)] + qty
- else
- rcTable.[(idx1, idx2)] <- qty
- ()
- // The main processor for the sequence for an order
- let processOrder (header:OrderHeader) (orders:OrderDetail seq) =
- let orderList = Seq.toList orders
- // if order only has one product then no correlation can be determined
- if (orderList.Length > 1) then
- pairs orderList
- |> Seq.iter (fun (order1, order2) ->
- let qty = (min (float (max order1.OrderQty order2.OrderQty)) qtyMaximum) * (orderFactor header)
- addRowColumn order1.ProductId order2.ProductId qty
- addRowColumn order2.ProductId order1.ProductId qty)
- // Creates a sequence of the input based on the provided orderId
- let lastInput = ref None
- let continueDo = ref false
- let inputsByKey key firstValue = seq {
- // Yield any value from previous read
- yield firstValue
- continueDo := true
- while !continueDo do
- match getInput() with
- | Some(orderDetail) when orderDetail.OrderId = key ->
- // Yield found value and remainder of sequence
- yield orderDetail
- | Some(orderDetail) ->
- // Have a value but different key
- lastInput := Some(orderDetail)
- continueDo := false
- | None ->
- // Have no more entries
- lastInput := None
- continueDo := false
- }
- // Controls the calling of the matrix maker
- let rec processInput (input:OrderDetail option) =
- match input with
- | Some(orderDetail) ->
- let header = {OrderHeader.OrderId = orderDetail.OrderId; OrderDate = orderDetail.OrderDate}
- processOrder header (inputsByKey orderDetail.OrderId orderDetail)
- processInput lastInput.contents
- | _ -> ()
- // Build the matrix/table from the input data
- processInput (getInput())
- // return the table defintion along with the size
- (!minItem, !maxItem, rcTable)
- /// Build a Sparse matrix from the file data
- let GetMatrixParallel (filenames:string array) =
- // In Parallel gets the RC tables
- let results =
- filenames
- |> Array.Parallel.map (fun filename -> getMatrixData filename)
- // define the max sparse array size
- let (minItem, maxItem) =
- results
- |> Array.fold (fun acc (idxMin, idxMax, _) ->
- (min idxMin (fst acc), max idxMax (snd acc))) (0, 0)
- let offset = minItem
- let size = maxItem + 1 - minItem
- // convert to a sparse matrix and return
- let items = seq {
- for (_, _, rcTable) in results do
- for item in rcTable do
- if item.Value > entryThreshold then
- yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
- }
- (offset, SparseMatrix.ofSeq size size items)
- /// Interface for a single file
- let GetMatrix (filename:string) =
- let (minItem, maxItem, rcTable) = getMatrixData filename
- let offset = minItem
- let size = maxItem + 1 - minItem
- // convert to a sparse matrix and return
- let items = seq {
- for item in rcTable do
- if item.Value > entryThreshold then
- yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
- }
- (offset, SparseMatrix.ofSeq size size items)
In this instance the file data is processed such that order data is grouped and exposed as an OrderHeader type, along with a collection of OrderDetail types. From this the pairs of products are easily calculated; using the pairs function.
To maintain a co-occurrence count for each product pair a Dictionary is used. The key for the Dictionary is a tuple of the pair of products, with the value being the co-occurrence count. The rationale for using a Dictionary is that it supports O(1) lookups; whereas maintaining an Array will incur an O(n) lookup.
The final decision to be made is of what quantity should be added into the running total. One could use a value of 1 for all co-occurrences, but other options are available. The first is the order quantity. If one has a co-occurrence for products quantities x and y, I have defined the product quantity as (max x y). The rationale for this is that having product quantities is logically equivalent to having multiple product lines in the same order. If this was the case then the co-occurrence counting would arrive at the same value. However I have treated only the maximum amount as the contributing factor. I have also limited the maximum quantity amount such that small products ordered in large volumes do not skew the numbers; such as restricting a rating from 1-5.
One optional quantity factor I have included is one based on the order header. The sample code applies a quantity scaling factor for recent orders. Again the rationale for this is such that recent orders have a greater affect on the co-occurrence values over older ones. All these scaling factors are optional and should be configured to give your desired results.
As mentioned, to achieve parallelism in the matrix building code the collection of input files can be processed in parallel with each parallel step independently outing its collection of product pairs and co-occurrence counts. This is achieved using an Array.Parallel.map function call. This maps each input file into the Dictionary for the specified file. Once all the Dictionary elements have been created they are then used to create a sequence of elements to create the sparse matrix.
One other critical element returned from defining the Dictionary is the maximum product identifier. It is assumed that the product identifier is an integer which is used as an index into the matrix; for both row and column values. Thus the maximum value is needed to define row and columns dimensions for the sparse matrix.
Whereas this approach works well for products ranges from 1 onwards, what if product identifiers are defined from a large integer base, such as 100,000, or are Guid’s. In the former case one has the option of calculating an offset such that the index for the matrix is the product identifier minus the starting offset; this being the approach I have taken. The other option is that the exported data is mapped such that the product numbers are defined from 1 onwards; again this can be a simple offset calculation. In the latter case for Guid’s, one would need to do a mapping from the Guid keys to a sequential integer.
Querying for Item Similarity
So once the sparse matrix has been built how does one make recommendations. There are basically two options, either recommendations based on a single product selection, or recommendations for multiple products, say based on the contents of a shopping basket.
The basic process for performing the recommendations, in either case, is as follows:
- Select the sparse row vectors that correspond to the products for which a recommendation is required
- Place the row values into a PriorityQueue where the key is the co-occurrence count and the value the product identifier
- Dequeue and return, as a sequence, the required number of recommendations from the PriorityQueue
For the case of a single product the recommendations are just a case of selecting the correct single vector and returning the products with the highest co-occurrence count. The process for a selection of products is almost the same, except that multiple vectors are added into the PriorityQueue. One just needs to ensure that products that are already in the selection on which the recommendation is being made are excluded from the returned values. This is easily achieved with a HashSet lookup.
So the full code that wraps building the recommendation is as follows:
- type MatrixQuery (filenames:string array) =
- let defaultRecommendations = 12
- let (offset, coMatrix) =
- match filenames with
- | [||] -> invalidArg "filename" "Filename cannot be an empty Array"
- | [| filename |] -> MatrixBuilder.GetMatrix(filename)
- | _ -> MatrixBuilder.GetMatrixParallel(filenames)
- let getQueueSM (products:int array) =
- // Define the priority queue and lookup table
- let queue = PriorityQueue(coMatrix.ColumnCount)
- let lookup = HashSet(products)
- // Add the items into a priority queue
- products
- |> Array.iter (fun item ->
- let itemIdx = item - offset
- if itemIdx >= 0 && itemIdx < coMatrix.ColumnCount then
- seq {
- for idx = 0 to (coMatrix.ColumnCount - 1) do
- let productIdx = idx + offset
- if (not (lookup.Contains(productIdx))) && (coMatrix.[itemIdx, idx] > 0.0) then
- yield KeyValuePair(coMatrix.[itemIdx, idx], productIdx)
- }
- |> queue.Merge)
- // Return the queue
- queue
- let getItems (queue:PriorityQueue<float, int>) (items:int) =
- let toDequeue = min items queue.Count
- seq { for i in 1 .. toDequeue do yield queue.Dequeue().Value }
- new(filename:string) =
- MatrixQuery([| filename |])
- /// Get the requested number of Recommendations for a Product
- member self.GetRecommendations(product:int, items:int) =
- let queue = getQueueSM([| product |])
- getItems queue items
- /// Get the requested number of Recommendations for a Product Array
- member self.GetRecommendations(products:int array, items:int) =
- let queue = getQueueSM(products)
- getItems queue items
- /// Get the default number of Recommendations for a Product
- member self.GetRecommendations(product:int) =
- self.GetRecommendations(product, defaultRecommendations)
- /// Get the default number of Recommendations for a Product Array
- member self.GetRecommendations(products:int array) =
- self.GetRecommendations(products, defaultRecommendations)
Using the code is a simply a matter of creating the MatrixQuery type, with the files to load, and then calling the GetRecommendations() operator for the required products (shopping basket):
let filenames = [|
@"C:\DataExport\SalesOrders201203.dat";
@"C:\DataExport\SalesOrders201204.dat";
@"C:\DataExport\SalesOrders201205.dat";
@"C:\DataExport\SalesOrders201206.dat" |]
let itemQuery = MatrixQuery(filenames)
let products = itemQuery.GetRecommendations([| 860; 870; 873 |], 25)
In extracting the row vector associated with the required product one could just have used coMatrix.Row(item); but this creates a copy of the vector. To avoid this the code just does an enumeration of the required matrix row. Internally the sparse vector maintains three separate arrays for the column and row offsets and sparse value. Using these internal arrays and the associated element based operations would speed up obtaining recommendations; but currently these properties are not exposed. If one uses the sparse matrix from the F# power Pack then one can operate in this fashion using the following code:
- let getQueueSM (products:int array) =
- // Define the priority queue and lookup table
- let queue = PriorityQueue(coMatrix.NumCols)
- let lookup = HashSet(products)
- // Add the items into a priority queue
- products
- |> Array.iter (fun item ->
- let last = coMatrix.InternalSparseRowOffsets.Length - 1
- if item >= 0 && item <= last then
- let (startI, endI) =
- if item = last then (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item])
- else (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item + 1] - 1)
- seq {
- for idx = startI to endI do
- if not (lookup.Contains(idx)) then
- let product = coMatrix.InternalSparseColumnValues.[idx]
- yield KeyValuePair(-coMatrix.InternalSparseValues.[idx], product)
- }
- |> queue.Merge
- )
- // Return the queue
- queue
In considering a shopping basket I have assumed that each product has been selected only once. You could have the situation where one wanted to take into consideration the quantity of each product selected. In this instance one would take the approach of performing a scalar multiplication of the corresponding product vectors. What this would achieve is, for recommendations, prioritizing products for which a user has purchased multiple items.
Although this code does a lot of element-wise operations, as mentioned earlier, one can think of the operations in pure matrix terms. In this case one would just multiply the sparse matrix by a sparse column vector representing the items from which to make the recommendations.
Consider the previous sparse matrix example, and say one had a basket consisting of 2 products 1002 and 1 product 1004:
Product ID | 1001 | 1002 | 1003 | 1004 | 1005 | Product ID | Qty | Product ID | Rec | ||
1001 | 1 | 3 | 1 | 1 | 1001 | 1001 | 3 | ||||
1002 | 1 | 3 | 1002 | 2 | 1002 | ||||||
1003 | 3 | 3 | 4 | 4 | × | 1003 | = | 1003 | 10 | ||
1004 | 1 | 4 | 2 | 1004 | 1 | 1004 | |||||
1005 | 1 | 4 | 2 | 1005 | 1005 | 2 |
In this case it should be easy to see the recommendation for products should be 1003, 1001, and lastly 1005.
Conclusion
The approach mentioned here will probably work well for the cases where one has 100,000’s products with similarities between 1000’s of items, with 10’s millions of orders to be considered. For most situations this should suffice. However, if you are not in this bracket, in my next post, I will show how this approach can be mapped over to Hadoop and MapReduce, allowing for even greater scale.
Also, in a future post I will port the code to use the Cloud Numerics implementation of matrices.