Getting Started
To get started, add the following dependency to the pom:
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-flink_2.10</artifactId>
<version>0.12.0</version>
</dependency>
Here is how to use the Flink backend:
import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
object ReadCsvExample {
def main(args: Array[String]): Unit = {
val filePath = "path/to/the/input/file"
val env = ExecutionEnvironment.getExecutionEnvironment
implicit val ctx = new FlinkDistributedContext(env)
val drm = readCsv(filePath, delim = "\t", comment = "#")
val C = drm.t %*% drm
println(C.collect)
}
}
Current Status
The top JIRA for Flink backend is MAHOUT-1570 which has been fully implemented.
Implemented
- MAHOUT-1701 Mahout DSL for Flink: implement AtB ABt and AtA operators
- MAHOUT-1702 implement element-wise operators (like
A + 2
or A + B
)
- MAHOUT-1703 implement
cbind
and rbind
- MAHOUT-1709 implement slicing (like
A(1 to 10, ::)
)
- MAHOUT-1710 implement right in-core matrix multiplication (
A %*% B
when B
is in-core)
- MAHOUT-1711 implement broadcasting
- MAHOUT-1712 implement operators
At
, Ax
, Atx
- Ax
and At
are implemented
- MAHOUT-1734 implement I/O - should be able to read results of Flink bindings
- MAHOUT-1747 add support for different types of indexes (String, long, etc) - now supports
Int
, Long
and String
- MAHOUT-1748 switch to Flink Scala API
- MAHOUT-1749 Implement
Atx
- MAHOUT-1750 Implement
ABt
- MAHOUT-1751 Implement
AtA
- MAHOUT-1755 Flush intermediate results to FS - Flink, unlike Spark, does not store intermediate results in memory.
- MAHOUT-1764 Add standard backend tests for Flink
- MAHOUT-1765 Add documentation about Flink backend
- MAHOUT-1776 Refactor common Engine agnostic classes to Math-Scala module
- MAHOUT-1777 move HDFSUtil classes into the HDFS module
- MAHOUT-1804 Implement drmParallelizeWithRowLabels(..) in Flink
- MAHOUT-1805 Implement allReduceBlock(..) in Flink bindings
- MAHOUT-1809 Failing tests in flin-bindings: dals and dspca
- MAHOUT-1810 Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue)
- MAHOUT-1812 Implement drmParallelizeWithEmptyLong(..) in flink bindings
- MAHOUT-1814 Implement drm2intKeyed in flink bindings
- MAHOUT-1815 dsqDist(X,Y) and dsqDist(X) failing in flink tests
- MAHOUT-1816 Implement newRowCardinality in CheckpointedFlinkDrm
- MAHOUT-1817 Implement caching in Flink Bindings
- MAHOUT-1818 dals test failing in Flink Bindings
- MAHOUT-1819 Set the default Parallelism for Flink execution in FlinkDistributedContext
- MAHOUT-1820 Add a method to generate Tuple<PartitionId, Partition elements count» to support Flink backend
- MAHOUT-1821 Use a mahout-flink-conf.yaml configuration file for Mahout specific Flink configuration
- MAHOUT-1822 Update NOTICE.txt, License.txt to add Apache Flink
- MAHOUT-1823 Modify MahoutFlinkTestSuite to implement FlinkTestBase
- MAHOUT-1824 Optimize FlinkOpAtA to use upper triangular matrices
- MAHOUT-1825 Add List of Flink algorithms to Mahout wiki page
Tests
There is a set of standard tests that all engines should pass (see MAHOUT-1764).
DistributedDecompositionsSuite
DrmLikeOpsSuite
DrmLikeSuite
RLikeDrmOpsSuite
These are Flink-backend specific tests, e.g.
DrmLikeOpsSuite
for operations like norm
, rowSums
, rowMeans
RLikeOpsSuite
for basic LA like A.t %*% A
, A.t %*% x
, etc
LATestSuite
tests for specific operators like AtB
, Ax
, etc
UseCasesSuite
has more complex examples, like power iteration, ridge regression, etc
Environment
For development the minimal supported configuration is
When using mahout, please import the following modules:
mahout-math
mahout-math-scala
mahout-flink_2.10
*