Support for dimensional reduction
Matrix algebra underpins the way many Big Data algorithms and data
structures are composed: full-text search can be viewed as doing matrix
multiplication of the term-document matrix by the query vector (giving a
vector over documents where the components are the relevance score),
computing co-occurrences in a collaborative filtering context (people who
viewed X also viewed Y, or ratings-based CF like the Netflix Prize contest)
is taking the squaring the user-item interaction matrix, calculating users
who are k-degrees separated from each other in a social network or
web-graph can be found by looking at the k-fold product of the graph
adjacency matrix, and the list goes on (and these are all cases where the
linear structure of the matrix is preserved!)
Each of these examples deal with cases of matrices which tend to be
tremendously large (often millions to tens of millions to hundreds of
millions of rows or more, by sometimes a comparable number of columns), but
also rather sparse. Sparse matrices are nice in some respects: dense
matrices which are 10^7 on a side would have 100 trillion non-zero entries!
But the sparsity is often problematic, because any given two rows (or
columns) of the matrix may have zero overlap. Additionally, any
machine-learning work done on the data which comprises the rows has to deal
with what is known as “the curse of dimensionality”, and for example, there
are too many columns to train most regression or classification problems on
them independently.
One of the more useful approaches to dealing with such huge sparse data
sets is the concept of dimensionality reduction, where a lower dimensional
space of the original column (feature) space of your data is found /
constructed, and your rows are mapped into that subspace (or sub-manifold).
In this reduced dimensional space, “important” components to distance
between points are exaggerated, and unimportant ones washed away, and
additionally, sparsity of your rows is traded for drastically reduced
dimensional, but dense “signatures”. While this loss of sparsity can lead
to its own complications, a proper dimensionality reduction can help reveal
the most important features of your data, expose correlations among your
supposedly independent original variables, and smooth over the zeroes in
your correlation matrix.
One of the most straightforward techniques for dimensionality reduction is
the matrix decomposition: singular value decomposition, eigen
decomposition, non-negative matrix factorization, etc. In their truncated
form these decompositions are an excellent first approach toward linearity
preserving unsupervised feature selection and dimensional reduction. Of
course, sparse matrices which don’t fit in RAM need special treatment as
far as decomposition is concerned. Parallelizable and/or stream-oriented
algorithms are needed.
Singular Value Decomposition
Currently implemented in Mahout (as of 0.3, the first release with MAHOUT-180 applied), are two scalable implementations of SVD, a stream-oriented implementation using the Asymmetric Generalized Hebbian Algorithm outlined in Genevieve Gorrell & Brandyn Webb’s paper (Gorrell and Webb 2005
); and there is a [Lanczos | http://en.wikipedia.org/wiki/Lanczos_algorithm]
implementation, both single-threaded, and in the
o.a.m.math.decomposer.lanczos package (math module), as a hadoop map-reduce
(series of) job(s) in o.a.m.math.hadoop.decomposer package (core module).
Coming soon: stochastic decomposition.
See also: https://cwiki.apache.org/confluence/display/MAHOUT/SVD+-+Singular+Value+Decomposition
Lanczos
The Lanczos algorithm is designed for eigen-decomposition, but like any
such algorithm, getting singular vectors out of it is immediate (singular
vectors of matrix A are just the eigenvectors of A^t * A or A * A^t).
Lanczos works by taking a starting seed vector v (with cardinality equal
to the number of columns of the matrix A), and repeatedly multiplying A by
the result: v’ = A.times(v) (and then subtracting off what is
proportional to previous v’’s, and building up an auxiliary matrix of
projections). In the case where A is not square (in general: not
symmetric), then you actually want to repeatedly multiply AA^t by *v:
v’ = (A * A^t).times(v), or equivalently, in Mahout,
A.timesSquared(v) (timesSquared is merely an optimization: by changing
the order of summation in AA^t.times(v*), you can do the same computation
as one pass over the rows of A instead of two).
After k iterations of v_i = A.timesSquared(v_(i-1)), a k- by -k
tridiagonal matrix has been created (the auxiliary matrix mentioned above),
out of which a good (often extremely good) approximation to k of the
singular values (and with the basis spanned by the v_i, the k singular
vectors may also be extracted) of A may be efficiently extracted. Which
k? It’s actually a spread across the entire spectrum: the first few will
most certainly be the largest singular values, and the bottom few will be
the smallest, but you have no guarantee that just because you have the n’th
largest singular value of A, that you also have the (n-1)’st as well. A
good rule of thumb is to try and extract out the top 3k singular vectors
via Lanczos, and then discard the bottom two thirds, if you want primarily
the largest singular values (which is the case for using Lanczos for
dimensional reduction).
Parallelization Stragegy
Lanczos is “embarassingly parallelizable”: matrix multiplication of a
matrix by a vector may be carried out row-at-a-time without communication
until at the end, the results of the intermediate matrix-by-vector outputs
are accumulated on one final vector. When it’s truly A.times(v), the
final accumulation doesn’t even have collision / synchronization issues
(the outputs are individual separate entries on a single vector), and
multicore approaches can be very fast, and there should also be tricks to
speed things up on Hadoop. In the asymmetric case, where the operation is
A.timesSquared(v), the accumulation does require synchronization (the
vectors to be summed have nonzero elements all across their range), but
delaying writing to disk until Mapper close(), and remembering that having
a Combiner be the same as the Reducer, the bottleneck in accumulation is
nowhere near a single point.
Mahout usage
The Mahout DistributedLanzcosSolver is invoked by the
/bin/mahout svd command. This command takes the following
arguments (which can be reproduced by just entering the command with no
arguments):
Job-Specific Options:
--input (-i) input Path to job input directory.
--output (-o) output The directory pathname for output.
--numRows (-nr) numRows Number of rows of the input matrix
--numCols (-nc) numCols Number of columns of the input matrix
--rank (-r) rank Desired decomposition rank (note:
only roughly 1/4 to 1/3 of these will
have the top portion of the spectrum)
--symmetric (-sym) symmetric Is the input matrix square and
symmetric?
--cleansvd (-cl) cleansvd Run the EigenVerificationJob to clean
the eigenvectors after SVD
--maxError (-err) maxError Maximum acceptable error
--minEigenvalue (-mev) minEigenvalue Minimum eigenvalue to keep the vector for
--inMemory (-mem) inMemory Buffer eigen matrix into memory (if you have enough!)
--help (-h) Print out help
--tempDir tempDir Intermediate output directory
--startPhase startPhase First phase to run
--endPhase endPhase Last phase to run
The short form invocation may be used to perform the SVD on the input data:
/bin/mahout svd \
--input (-i) \
--output (-o) \
--numRows (-nr) \
--numCols (-nc) \
--rank (-r) \
--symmetric (-sym)
The --input argument is the location on HDFS where a
SequenceFile<Writable,VectorWritable> (preferably
SequentialAccessSparseVectors instances) lies which you wish to decompose.
Each vector of which has --numcols entries. --numRows is the number of
input rows and is used to properly size the matrix data structures.
After execution, the --output directory will have a file named
"rawEigenvectors" containing the raw eigenvectors. As the
DistributedLanczosSolver sometimes produces "extra" eigenvectors, whose
eigenvalues aren't valid, and also scales all of the eigenvalues down by
the max eignenvalue (to avoid floating point overflow), there is an
additional step which spits out the nice correctly scaled (and
non-spurious) eigenvector/value pairs. This is done by the "cleansvd" shell
script step (c.f. EigenVerificationJob).
If you have run he short form svd invocation above and require this
"cleaning" of the eigen/singular output you can run "cleansvd" as a
separate command:
/bin/mahout cleansvd \
--eigenInput \
--corpusInput \
--output \
--maxError <maximum allowed error. Default is 0.5> \
--minEigenvalue <minimum allowed eigenvalue. Default is 0.0> \
--inMemory
The --corpusInput is the input path from the previous step, --eigenInput is
the output from the previous step (