## Getting Started To get started, add the following dependency to the pom: org.apache.mahout mahout-flink_2.10 0.12.0 Here is how to use the Flink backend: import org.apache.flink.api.scala._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.flinkbindings._ object ReadCsvExample { def main(args: Array[String]): Unit = { val filePath = "path/to/the/input/file" val env = ExecutionEnvironment.getExecutionEnvironment implicit val ctx = new FlinkDistributedContext(env) val drm = readCsv(filePath, delim = "\t", comment = "#") val C = drm.t %*% drm println(C.collect) } } ## Current Status The top JIRA for Flink backend is [MAHOUT-1570](https://issues.apache.org/jira/browse/MAHOUT-1570) which has been fully implemented. ### Implemented * [MAHOUT-1701](https://issues.apache.org/jira/browse/MAHOUT-1701) Mahout DSL for Flink: implement AtB ABt and AtA operators * [MAHOUT-1702](https://issues.apache.org/jira/browse/MAHOUT-1702) implement element-wise operators (like `A + 2` or `A + B`) * [MAHOUT-1703](https://issues.apache.org/jira/browse/MAHOUT-1703) implement `cbind` and `rbind` * [MAHOUT-1709](https://issues.apache.org/jira/browse/MAHOUT-1709) implement slicing (like `A(1 to 10, ::)`) * [MAHOUT-1710](https://issues.apache.org/jira/browse/MAHOUT-1710) implement right in-core matrix multiplication (`A %*% B` when `B` is in-core) * [MAHOUT-1711](https://issues.apache.org/jira/browse/MAHOUT-1711) implement broadcasting * [MAHOUT-1712](https://issues.apache.org/jira/browse/MAHOUT-1712) implement operators `At`, `Ax`, `Atx` - `Ax` and `At` are implemented * [MAHOUT-1734](https://issues.apache.org/jira/browse/MAHOUT-1734) implement I/O - should be able to read results of Flink bindings * [MAHOUT-1747](https://issues.apache.org/jira/browse/MAHOUT-1747) add support for different types of indexes (String, long, etc) - now supports `Int`, `Long` and `String` * [MAHOUT-1748](https://issues.apache.org/jira/browse/MAHOUT-1748) switch to Flink Scala API * [MAHOUT-1749](https://issues.apache.org/jira/browse/MAHOUT-1749) Implement `Atx` * [MAHOUT-1750](https://issues.apache.org/jira/browse/MAHOUT-1750) Implement `ABt` * [MAHOUT-1751](https://issues.apache.org/jira/browse/MAHOUT-1751) Implement `AtA` * [MAHOUT-1755](https://issues.apache.org/jira/browse/MAHOUT-1755) Flush intermediate results to FS - Flink, unlike Spark, does not store intermediate results in memory. * [MAHOUT-1764](https://issues.apache.org/jira/browse/MAHOUT-1764) Add standard backend tests for Flink * [MAHOUT-1765](https://issues.apache.org/jira/browse/MAHOUT-1765) Add documentation about Flink backend * [MAHOUT-1776](https://issues.apache.org/jira/browse/MAHOUT-1776) Refactor common Engine agnostic classes to Math-Scala module * [MAHOUT-1777](https://issues.apache.org/jira/browse/MAHOUT-1777) move HDFSUtil classes into the HDFS module * [MAHOUT-1804](https://issues.apache.org/jira/browse/MAHOUT-1804) Implement drmParallelizeWithRowLabels(..) in Flink * [MAHOUT-1805](https://issues.apache.org/jira/browse/MAHOUT-1805) Implement allReduceBlock(..) in Flink bindings * [MAHOUT-1809](https://issues.apache.org/jira/browse/MAHOUT-1809) Failing tests in flin-bindings: dals and dspca * [MAHOUT-1810](https://issues.apache.org/jira/browse/MAHOUT-1810) Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue) * [MAHOUT-1812](https://issues.apache.org/jira/browse/MAHOUT-1812) Implement drmParallelizeWithEmptyLong(..) in flink bindings * [MAHOUT-1814](https://issues.apache.org/jira/browse/MAHOUT-1814) Implement drm2intKeyed in flink bindings * [MAHOUT-1815](https://issues.apache.org/jira/browse/MAHOUT-1815) dsqDist(X,Y) and dsqDist(X) failing in flink tests * [MAHOUT-1816](https://issues.apache.org/jira/browse/MAHOUT-1816) Implement newRowCardinality in CheckpointedFlinkDrm * [MAHOUT-1817](https://issues.apache.org/jira/browse/MAHOUT-1817) Implement caching in Flink Bindings * [MAHOUT-1818](https://issues.apache.org/jira/browse/MAHOUT-1818) dals test failing in Flink Bindings * [MAHOUT-1819](https://issues.apache.org/jira/browse/MAHOUT-1819) Set the default Parallelism for Flink execution in FlinkDistributedContext * [MAHOUT-1820](https://issues.apache.org/jira/browse/MAHOUT-1820) Add a method to generate Tuple> to support Flink backend * [MAHOUT-1821](https://issues.apache.org/jira/browse/MAHOUT-1821) Use a mahout-flink-conf.yaml configuration file for Mahout specific Flink configuration * [MAHOUT-1822](https://issues.apache.org/jira/browse/MAHOUT-1822) Update NOTICE.txt, License.txt to add Apache Flink * [MAHOUT-1823](https://issues.apache.org/jira/browse/MAHOUT-1823) Modify MahoutFlinkTestSuite to implement FlinkTestBase * [MAHOUT-1824](https://issues.apache.org/jira/browse/MAHOUT-1824) Optimize FlinkOpAtA to use upper triangular matrices * [MAHOUT-1825](https://issues.apache.org/jira/browse/MAHOUT-1825) Add List of Flink algorithms to Mahout wiki page ### Tests There is a set of standard tests that all engines should pass (see [MAHOUT-1764](https://issues.apache.org/jira/browse/MAHOUT-1764)). * `DistributedDecompositionsSuite` * `DrmLikeOpsSuite` * `DrmLikeSuite` * `RLikeDrmOpsSuite` These are Flink-backend specific tests, e.g. * `DrmLikeOpsSuite` for operations like `norm`, `rowSums`, `rowMeans` * `RLikeOpsSuite` for basic LA like `A.t %*% A`, `A.t %*% x`, etc * `LATestSuite` tests for specific operators like `AtB`, `Ax`, etc * `UseCasesSuite` has more complex examples, like power iteration, ridge regression, etc ## Environment For development the minimal supported configuration is * [JDK 1.7](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html) * [Scala 2.10] When using mahout, please import the following modules: * `mahout-math` * `mahout-math-scala` * `mahout-flink_2.10` *