Scaling up Scikit-Learns Random Projection using Apache Spark
By Sashi Dareddy, Lead Data Scientist
What is Random Projection (RP)?
Random Projection is a mathematical technique to reduce the dimensionality of a problem much like Singular Value Decomposition (SVD) or Principal Component Analysis (PCA) but only simpler & computationally faster.
[Throughout this article, I will use Random Projection and Sparse Random Projection interchangeably.]
It is particularly useful when:
- Calculating PCA/SVD can be very prohibitive in terms of both running time & memory/hardware constraints.
- Working with extremely large dimensional sparse datasets with both large n (>> 100million rows/samples) & large m (>> 10 million columns/features) in a distributed setting.
- When you require sparsity in the low-dimensional projected data
Why can't we use Scikit-Learn?
You can use Scikit-Learn if your dataset is small enough to fit in a single machine's memory. But, when it comes to the actual projection you will find that it doesn't scale well beyond a single machine's resources.
Why can't we use Apache Spark?
As at version 2.1.0 of Apache Spark MLLib, the following dimensionality reduction techniques are available:
- Singular Value Decomposition (SVD)
- Principal Component Analysis (PCA)
There has been at least one attempt to implement Random Projection in Apache Spark MLLib but those efforts don't appear to have made it through to the latest release.
In this article, I will present a recipe to perform Random Projection using PySpark. It brings the scalability of Apache Spark to the Random Projection implementation in Scikit-Learn. As a bonus, you can extend the idea presented in this article to perform general sparse matrix by sparse matrix multiplication (as long as one of the sparse matrix is small enough to fit in memory) resulting in another sparse matrix.
Further reading:
- 4.5. Random Projection - particularly, The Johnson-Lindenstrauss lemma and Sparse random projection sections
So, how do we apply Random Projection?
There are two main steps in projecting a n x m matrix into a low dimensional space using Random Projection:
- Generating a m x p Projection Matrix with a pre-specified sparsity factor - this is where we will leverage Scikit-Learns implementation of Sparse Random Projection and generate the projection matrix.
- Matrix multiplication - wherein we multiply an n x m input dataset with an m x p Projection Matrix yielding a new n x p matrix (which is said to be projected into a lower dimension) - we will leverage Scipy & Spark to deliver the large scale sparse matrix by sparse matrix multiplication resulting in another sparse matrix in a lower dimension
The Setup
Here, I will show you how to apply Random Projection by way of an example.
Data
We will be working with KDD2012 datasets available in LibSVM format.
The training dataset has 119,705,032 rows and 54,686,452 features - we will apply Random Projection and transform this dataset into one which has 119,705,032 rows and ~4k features. In order to demonstrate the scalability of my approach, I'm going to artificially increase the size of the training set by a factor of 9 - giving us just over 1 Billion rows.
Note, even the transformed dataset needs to be in sparse format otherwise the dataset could take up ~16TB in dense format.
Compute Resources
I will be using a 6-node (90-core, 300G RAM) Spark HDInsight Cluster from Microsoft Azure
Microsoft is giving away £150 in free credit to experiment in Azure.
I will also be using Azure Storage, particularly the Blob Service to store my input/output datasets.
It roughly takes 1ms per row (single core performance) including I/O to process the data. To project a dataset with 1 billion rows and 54 million columns into a new dataset with 1 billion rows and 4096 features took me about 3.75 hours on the above-mentioned cluster.
Memory Consideration
Your spark executors would need enough RAM to hold 2 x size of a Spark DataFrame partition plus a projection matrix (this is usually a few 100 MBs). If you are running low on memory, then you can repartition your Spark DataFrame to hold smaller chunks of data.
Code walkthrough:
There are two versions of random projection code depending on whether you want to run the code on a single machine or on a cluster. Refer to ./code/localmode/ or ./code/clustermode/ depending on your requirement. But here I will discuss the cluster mode code.
In [1]: First, let's get the required imports out of the way. Of particular note is the johnson_lindenstrauss_min_dim function - this function, given an input matrix, returns the no. of dimensions the projected space should have. In my case I'm going to fix the no. of dimensions at 4096.
# imports import logging import os import numpy as np import math import scipy.sparse as ssp from pyspark.sql import functions as f from sklearn.random_projection import johnson_lindenstrauss_min_dim, SparseRandomProjection from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.ml.linalg import Vectors, VectorUDT from sklearn.externals import joblib
In [2]: Below we will generate a Sparse Random Projection Matrix which will be a Scipy's CSR matrix which needs to be saved to disk to transform future data.
Note: the dimensionality of the reduced dimension space is independent of the no. of features (in our case 54 million) - it only depends on the no. of rows (in our case 1 Billion) according to the The Johnson-Lindenstrauss lemma.
# generating the random projection matrix dummy_X_rows = train.count() # 1,077,345,288 rows dummy_X_cols = train.first()["features"].size # 54,686,452 features # we only use the shape-info of this dummy matrix dummy_X = ssp.csr_matrix((dummy_X_rows, dummy_X_cols), dtype=np.float32)
# find the optimal (conservative estimate) no. of dimensions required according to the # johnson_lindenstrauss_min_dim function # rproj_ndim = johnson_lindenstrauss_min_dim(dummy_X_rows, eps=0.2) # returns 4250
rproj_ndim = 4096
# using a fixed value instead of johnson_lindenstrauss_min_dim suggestion
logging.info("Fitting Sparse Random Projection to have %d columns after projection " % rproj_ndim) srp = SparseRandomProjection(n_components=rproj_ndim, random_state=123)
srp.fit(dummy_X) logging.info("Saving the projection matrix to disk... ") joblib.dump(srp, os.path.join("/tmp","srp.pkl"))
In [3]: Here we will define a Python function which takes a whole Spark DataFrame partition and a projection matrix to return projected data. In a nutshell, this function converts a whole Spark DataFrame partition into a Scipy CSR matrix and then simply multiplies it with our projection matrix.
def random_project_mappartitions_function(rdd_row_iterator, local_csr_matrix):
""" This function is intended to be used in a <df>.rdd.mapPartitions(lambda rdd_map: random_project_mapf\ (rdd_map, local_rnd_mat)) setting. :param rdd_row_iterator: a list of n-dim sparse vectors :param local_csr_matrix: the projection matrix - should have dimensions: n x p :return: a list of p-dim sparse-vectors - same length as input """ keys = []
# this will be a list of single row sparsevectors transformed into scipy csr matrix features_single_row_matrix_list = [] PROJECT_DIM_SIZE = local_csr_matrix.shape[1]
for row in rdd_row_iterator: # capture keys if "label" in row: keys.append((row["id"], row["label"])) else: keys.append((row["id"]))
# work on values: feature_dim_size = row["features"].size # feature dimensionality before projection col_indices = row["features"].indices row_indices = [0] * len(col_indices) # defaulting to 0 as we are creating single row matrix data = row["features"].values.astype(np.float32)
feature_mat = ssp.coo_matrix((data, (row_indices, col_indices)), shape=(1, feature_dim_size)).tocsr() features_single_row_matrix_list.append(feature_mat)
# vstacking single row matrices into one large sparse matrix features_matrix = ssp.vstack(features_single_row_matrix_list)
del features_single_row_matrix_list
projected_features = features_matrix.dot(local_csr_matrix)
del features_matrix, local_csr_matrix
projected_features_list = (Vectors.sparse(PROJECT_DIM_SIZE, zip(i.indices, i.data))
for i in projected_features)
if "label" in row: return zip((i[0] for i in keys), (i[1] for i in keys), projected_features_list) else: return zip((i[0] for i in keys), projected_features_list)
In [4]: and here is where the main action takes place – the projected output is saved to a parquet file.
if __name__ == '__main__': N_FEATURES=54686452
# these are the no. of features in the dataset we are going to use in this app logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO)
# fire up a spark session spark = SparkSession \ .builder \ .appName("PySpark Random Projection Demo") \ .getOrCreate()
sc = spark.sparkContext
DATA_DIR = 'wasb://kdd12-blob@sashistorage.blob.core.windows.net/'
# Azure Storage blob
train = spark.read.format("libsvm").load(DATA_DIR, numFeatures=N_FEATURES)\ .withColumn("id", f.monotonically_increasing_id()) print(train.show())
train_schema = StructType( [StructField("id", LongType(), False) ,StructField("label", FloatType(), False) , StructField("features", VectorUDT(), False) ] )
# generating the random projection matrix dummy_X_rows = train.count() dummy_X_cols = train.first()["features"].size dummy_X = ssp.csr_matrix((dummy_X_rows, dummy_X_cols), dtype=np.float32) # the shape-only of this dummy
# find the optimal (conservative estimate) no. of dimensions required according to the
# johnson_lindenstrauss_min_dim function
# rproj_ndim = johnson_lindenstrauss_min_dim(dummy_X_rows, eps=0.2) # returns 4250
rproj_ndim = 4096
logging.info("Fitting Sparse Random Projection to have %d columns after projection " % rproj_ndim) srp = SparseRandomProjection(n_components=rproj_ndim, random_state=123)
srp.fit(dummy_X) logging.info("Saving the projection matrix to disk... ") joblib.dump(srp, os.path.join("/tmp","srp.pkl"))
local_rnd_mat = srp.components_.T.astype(np.float32)
# broadcast the local_rnd_mat so it is available on all nodes local_rnd_mat_bc_var = sc.broadcast(local_rnd_mat)
logging.info("Applying random projection to rdd map partitions") train_projected_df = train.rdd\ .mapPartitions(lambda rdd_map_partition: random_project_mappartitions_function(rdd_map_partition,local_rnd_mat_bc_var.value))\ .toDF(train_schema)
logging.info("Writing projected data to disk...") train_projected_df\ .write\ .mode("overwrite")\ .parquet(DATA_DIR+"/train_features_random_projected.parquet/")
logging.info("Sample rows from training set before projection...") print(train.show()) logging.info("Sample rows from training set after projection...") print(spark.read.parquet(DATA_DIR+"/train_features_random_projected.parquet/").show())
spark.stop()
In [5]: Finally, here is how we submit the PySpark app to the cluster.
#!/usr/bin/env bash echo "Submitting PySpark app..." spark-submit \ --master yarn \ --executor-memory 3G \ --driver-memory 6G \ --num-executors 85 \ --executor-cores 1 \ code/clustermode/randomProjection.py
echo "Exporting Projection Matrix to Azure Storage..." zip /tmp/srp.pkl.zip /tmp/srp.pkl*
export AZURE_STORAGE_ACCOUNT=<storage account name> export AZURE_STORAGE_ACCESS_KEY=<storage account access key> azure storage blob upload --container modelling-outputs --file /tmp/srp.pkl.zip
For the full code refer to https://github.com/SashiDareddy/RandomProjection