Announcing Flink 0.9.0-milestone1 preview release

13 Apr 2015

The Apache Flink community is pleased to announce the availability of the 0.9.0-milestone-1 release. The release is a preview of the upcoming 0.9.0 release. It contains many new features which will be available in the upcoming 0.9 release. Interested users are encouraged to try it out and give feedback. As the version number indicates, this release is a preview release that contains known issues.

You can download the release here and check out the latest documentation here. Feedback through the Flink mailing lists is, as always, very welcome!

New Features

Table API

Flink’s new Table API offers a higher-level abstraction for interacting with structured data sources. The Table API allows users to execute logical, SQL-like queries on distributed data sets while allowing them to freely mix declarative queries with regular Flink operators. Here is an example that groups and joins two tables:

val clickCounts = clicks
  .groupBy('user).select('userId, 'url.count as 'count)

val activeUsers = users.join(clickCounts)
  .where('id === 'userId && 'count > 10).select('username, 'count, ...)

Tables consist of logical attributes that can be selected by name rather than physical Java and Scala data types. This alleviates a lot of boilerplate code for common ETL tasks and raises the abstraction for Flink programs. Tables are available for both static and streaming data sources (DataSet and DataStream APIs).

Check out the Table guide for Java and Scala here.

Gelly Graph Processing API

Gelly is a Java Graph API for Flink. It contains a set of utilities for graph analysis, support for iterative graph processing and a library of graph algorithms. Gelly exposes a Graph data structure that wraps DataSets for vertices and edges, as well as methods for creating graphs from DataSets, graph transformations and utilities (e.g., in- and out- degrees of vertices), neighborhood aggregations, iterative vertex-centric graph processing, as well as a library of common graph algorithms, including PageRank, SSSP, label propagation, and community detection.

Gelly internally builds on top of Flink’s delta iterations. Iterative graph algorithms are executed leveraging mutable state, achieving similar performance with specialized graph processing systems.

Gelly will eventually subsume Spargel, Flink’s Pregel-like API. Check out the Gelly guide here.

This release includes the first version of Flink’s Machine Learning library. The library’s pipeline approach, which has been strongly inspired by scikit-learn’s abstraction of transformers and estimators, makes it easy to quickly set up a data processing pipeline and to get your job done.

Flink distinguishes between transformers and learners. Transformers are components which transform your input data into a new format allowing you to extract features, cleanse your data or to sample from it. Learners on the other hand constitute the components which take your input data and train a model on it. The model you obtain from the learner can then be evaluated and used to make predictions on unseen data.

Currently, the machine learning library contains transformers and learners to do multiple tasks. The library supports multiple linear regression using a stochastic gradient implementation to scale to large data sizes. Furthermore, it includes an alternating least squares (ALS) implementation to factorizes large matrices. The matrix factorization can be used to do collaborative filtering. An implementation of the communication efficient distributed dual coordinate ascent (CoCoA) algorithm is the latest addition to the library. The CoCoA algorithm can be used to train distributed soft-margin SVMs.

We are introducing a new execution mode for Flink to be able to run restricted Flink programs on top of Apache Tez. This mode retains Flink’s APIs, optimizer, as well as Flink’s runtime operators, but instead of wrapping those in Flink tasks that are executed by Flink TaskManagers, it wraps them in Tez runtime tasks and builds a Tez DAG that represents the program.

By using Flink on Tez, users have an additional choice for an execution platform for Flink programs. While Flink’s distributed runtime favors low latency, streaming shuffles, and iterative algorithms, Tez focuses on scalability and elastic resource usage in shared YARN clusters.

Get started with Flink on Tez here.

Reworked Distributed Runtime on Akka

Flink’s RPC system has been replaced by the widely adopted Akka framework. Akka’s concurrency model offers the right abstraction to develop a fast as well as robust distributed system. By using Akka’s own failure detection mechanism the stability of Flink’s runtime is significantly improved, because the system can now react in proper form to node outages. Furthermore, Akka improves Flink’s scalability by introducing asynchronous messages to the system. These asynchronous messages allow Flink to be run on many more nodes than before.

Exactly-once processing on Kafka Streaming Sources

This release introduces stream processing with exacly-once delivery guarantees for Flink streaming programs that analyze streaming sources that are persisted by Apache Kafka. The system is internally tracking the Kafka offsets to ensure that Flink can pick up data from Kafka where it left off in case of an failure.

Read here on how to use the persistent Kafka source.

Improved YARN support

Flink’s YARN client contains several improvements, such as a detached mode for starting a YARN session in the background, the ability to submit a single Flink job to a YARN cluster without starting a session, including a “fire and forget” mode. Flink is now also able to reallocate failed YARN containers to maintain the size of the requested cluster. This feature allows to implement fault-tolerant setups on top of YARN. There is also an internal Java API to deploy and control a running YARN cluster. This is being used by system integrators to easily control Flink on YARN within their Hadoop 2 cluster.

See the YARN docs here.

More Improvements and Fixes

  • FLINK-1605: Flink is not exposing its Guava and ASM dependencies to Maven projects depending on Flink. We use the maven-shade-plugin to relocate these dependencies into our own namespace. This allows users to use any Guava or ASM version.

  • FLINK-1417: Automatic recognition and registration of Java Types at Kryo and the internal serializers: Flink has its own type handling and serialization framework falling back to Kryo for types that it cannot handle. To get the best performance Flink is automatically registering all types a user is using in their program with Kryo.Flink also registers serializers for Protocol Buffers, Thrift, Avro and YodaTime automatically. Users can also manually register serializers to Kryo (https://issues.apache.org/jira/browse/FLINK-1399)

  • FLINK-1296: Add support for sorting very large records

  • FLINK-1679: "degreeOfParallelism" methods renamed to "parallelism"

  • FLINK-1501: Add metrics library for monitoring TaskManagers

  • FLINK-1760: Add support for building Flink with Scala 2.11

  • FLINK-1648: Add a mode where the system automatically sets the parallelism to the available task slots

  • FLINK-1622: Add groupCombine operator

  • FLINK-1589: Add option to pass Configuration to LocalExecutor

  • FLINK-1504: Add support for accessing secured HDFS clusters in standalone mode

  • FLINK-1478: Add strictly local input split assignment

  • FLINK-1512: Add CsvReader for reading into POJOs.

  • FLINK-1461: Add sortPartition operator

  • FLINK-1450: Add Fold operator to the Streaming api

  • FLINK-1389: Allow setting custom file extensions for files created by the FileOutputFormat

  • FLINK-1236: Add support for localization of Hadoop Input Splits

  • FLINK-1179: Add button to JobManager web interface to request stack trace of a TaskManager

  • FLINK-1105: Add support for locally sorted output

  • FLINK-1688: Add socket sink

  • FLINK-1436: Improve usability of command line interface

Announcing Flink 0.9.0-milestone1 preview release

March 2015 in the Flink community

07 Apr 2015

March has been a busy month in the Flink community.

A Flink runner for Google Cloud Dataflow was announced. See the blog posts by data Artisans and the Google Cloud Platform Blog. Google Cloud Dataflow programs can be written using and open-source SDK and run in multiple backends, either as a managed service inside Google's infrastructure, or leveraging open source runners, including Apache Flink.

The community has started an effort to better document the internals of Flink. Check out the first articles on the Flink wiki on how Flink manages memory, how tasks in Flink exchange data, type extraction and serialization in Flink, as well as how Flink builds on Akka for distributed coordination.

Check out also the new blog post on how Flink executes joins with several insights into Flink's runtime.

Meetups and talks

Flink's machine learning efforts were presented at the Machine Learning Stockholm meetup group. The regular Berlin Flink meetup featured a talk on the past, present, and future of Flink. The talk is available on youtube.

Table API in Scala and Java

The new Table API in Flink is now available in both Java and Scala. Check out the examples here (Java) and here (Scala).

Additions to the Machine Learning library

Flink's Machine Learning library is seeing quite a bit of traction. Recent additions include the CoCoA algorithm for distributed optimization.

Exactly-once delivery guarantees for streaming jobs

Flink streaming jobs now provide exactly once processing guarantees when coupled with persistent sources (notably Apache Kafka). Flink periodically checkpoints and persists the offsets of the sources and restarts from those checkpoints at failure recovery. This functionality is currently limited in that it does not yet handle large state and iterative programs.

A new execution environment enables non-iterative Flink jobs to use Tez as an execution backend instead of Flink's own network stack. Learn more here.

March 2015 in the Flink community

Peeking into Apache Flink's Engine Room

13 Mar 2015

Joins are prevalent operations in many data processing applications. Most data processing systems feature APIs that make joining data sets very easy. However, the internal algorithms for join processing are much more involved especially if large data sets need to be efficiently handled. Therefore, join processing serves as a good example to discuss the salient design points and implementation details of a data processing system.

In this blog post, we cut through Apache Flink’s layered architecture and take a look at its internals with a focus on how it handles joins. Specifically, I will

  • show how easy it is to join data sets using Flink’s fluent APIs,
  • discuss basic distributed join strategies, Flink’s join implementations, and its memory management,
  • talk about Flink’s optimizer that automatically chooses join strategies,
  • show some performance numbers for joining data sets of different sizes, and finally
  • briefly discuss joining of co-located and pre-sorted data sets.

Disclaimer: This blog post is exclusively about equi-joins. Whenever I say “join” in the following, I actually mean “equi-join”.

How do I join with Flink?

Flink provides fluent APIs in Java and Scala to write data flow programs. Flink’s APIs are centered around parallel data collections which are called data sets. data sets are processed by applying Transformations that compute new data sets. Flink’s transformations include Map and Reduce as known from MapReduce [1] but also operators for joining, co-grouping, and iterative processing. The documentation gives an overview of all available transformations [2].

Joining two Scala case class data sets is very easy as the following example shows:

// define your data types
case class PageVisit(url: String, ip: String, userId: Long)
case class User(id: Long, name: String, email: String, country: String)

// get your data from somewhere
val visits: DataSet[PageVisit] = ...
val users: DataSet[User] = ...

// filter the users data set
val germanUsers = users.filter((u) => u.country.equals("de"))
// join data sets
val germanVisits: DataSet[(PageVisit, User)] =
      // equi-join condition (PageVisit.userId = User.id)
     visits.join(germanUsers).where("userId").equalTo("id")

Flink’s APIs also allow to:

  • apply a user-defined join function to each pair of joined elements instead returning a ($Left, $Right) tuple,
  • select fields of pairs of joined Tuple elements (projection), and
  • define composite join keys such as .where(“orderDate”, “zipCode”).equalTo(“date”, “zip”).

See the documentation for more details on Flink’s join features [3].

Flink uses techniques which are well known from parallel database systems to efficiently execute parallel joins. A join operator must establish all pairs of elements from its input data sets for which the join condition evaluates to true. In a standalone system, the most straight-forward implementation of a join is the so-called nested-loop join which builds the full Cartesian product and evaluates the join condition for each pair of elements. This strategy has quadratic complexity and does obviously not scale to large inputs.

In a distributed system joins are commonly processed in two steps:

  1. The data of both inputs is distributed across all parallel instances that participate in the join and
  2. each parallel instance performs a standard stand-alone join algorithm on its local partition of the overall data.

The distribution of data across parallel instances must ensure that each valid join pair can be locally built by exactly one instance. For both steps, there are multiple valid strategies that can be independently picked and which are favorable in different situations. In Flink terminology, the first phase is called Ship Strategy and the second phase Local Strategy. In the following I will describe Flink’s ship and local strategies to join two data sets R and S.

Ship Strategies

Flink features two ship strategies to establish a valid data partitioning for a join:

  • the Repartition-Repartition strategy (RR) and
  • the Broadcast-Forward strategy (BF).

The Repartition-Repartition strategy partitions both inputs, R and S, on their join key attributes using the same partitioning function. Each partition is assigned to exactly one parallel join instance and all data of that partition is sent to its associated instance. This ensures that all elements that share the same join key are shipped to the same parallel instance and can be locally joined. The cost of the RR strategy is a full shuffle of both data sets over the network.

The Broadcast-Forward strategy sends one complete data set (R) to each parallel instance that holds a partition of the other data set (S), i.e., each parallel instance receives the full data set R. Data set S remains local and is not shipped at all. The cost of the BF strategy depends on the size of R and the number of parallel instances it is shipped to. The size of S does not matter because S is not moved. The figure below illustrates how both ship strategies work.

The Repartition-Repartition and Broadcast-Forward ship strategies establish suitable data distributions to execute a distributed join. Depending on the operations that are applied before the join, one or even both inputs of a join are already distributed in a suitable way across parallel instances. In this case, Flink will reuse such distributions and only ship one or no input at all.

Flink’s Memory Management

Before delving into the details of Flink’s local join algorithms, I will briefly discuss Flink’s internal memory management. Data processing algorithms such as joining, grouping, and sorting need to hold portions of their input data in memory. While such algorithms perform best if there is enough memory available to hold all data, it is crucial to gracefully handle situations where the data size exceeds memory. Such situations are especially tricky in JVM-based systems such as Flink because the system needs to reliably recognize that it is short on memory. Failure to detect such situations can result in an OutOfMemoryException and kill the JVM.

Flink handles this challenge by actively managing its memory. When a worker node (TaskManager) is started, it allocates a fixed portion (70% by default) of the JVM’s heap memory that is available after initialization as 32KB byte arrays. These byte arrays are distributed as working memory to all algorithms that need to hold significant portions of data in memory. The algorithms receive their input data as Java data objects and serialize them into their working memory.

This design has several nice properties. First, the number of data objects on the JVM heap is much lower resulting in less garbage collection pressure. Second, objects on the heap have a certain space overhead and the binary representation is more compact. Especially data sets of many small elements benefit from that. Third, an algorithm knows exactly when the input data exceeds its working memory and can react by writing some of its filled byte arrays to the worker’s local filesystem. After the content of a byte array is written to disk, it can be reused to process more data. Reading data back into memory is as simple as reading the binary data from the local filesystem. The following figure illustrates Flink’s memory management.

This active memory management makes Flink extremely robust for processing very large data sets on limited memory resources while preserving all benefits of in-memory processing if data is small enough to fit in-memory. De/serializing data into and from memory has a certain cost overhead compared to simply holding all data elements on the JVM’s heap. However, Flink features efficient custom de/serializers which also allow to perform certain operations such as comparisons directly on serialized data without deserializing data objects from memory.

Local Strategies

After the data has been distributed across all parallel join instances using either a Repartition-Repartition or Broadcast-Forward ship strategy, each instance runs a local join algorithm to join the elements of its local partition. Flink’s runtime features two common join strategies to perform these local joins:

  • the Sort-Merge-Join strategy (SM) and
  • the Hybrid-Hash-Join strategy (HH).

The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase). The sort is done in-memory if the local partition of a data set is small enough. Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key. The figure below shows how the Sort-Merge-Join strategy works.

The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works in two phases, a build phase followed by a probe phase. In the build phase, the algorithm reads the build-side input and inserts all data elements into an in-memory hash table indexed by their join key attributes. If the hash table outgrows the algorithm's working memory, parts of the hash table (ranges of hash indexes) are written to the local filesystem. The build phase ends after the build-side input has been fully consumed. In the probe phase, the algorithm reads the probe-side input and probes the hash table for each element using its join key attribute. If the element falls into a hash index range that was spilled to disk, the element is also written to disk. Otherwise, the element is immediately joined with all matching elements from the hash table. If the hash table completely fits into the working memory, the join is finished after the probe-side input has been fully consumed. Otherwise, the current hash table is dropped and a new hash table is built using spilled parts of the build-side input. This hash table is probed by the corresponding parts of the spilled probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash table completely fits into the working memory because an arbitrarily large the probe-side input can be processed on-the-fly without materializing it. However even if build-side input does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case, in-memory processing is partially preserved and only a fraction of the build-side and probe-side data needs to be written to and read from the local filesystem. The next figure illustrates how the Hybrid-Hash-Join works.

Ship and local strategies do not depend on each other and can be independently chosen. Therefore, Flink can execute a join of two data sets R and S in nine different ways by combining any of the three ship strategies (RR, BF with R being broadcasted, BF with S being broadcasted) with any of the three local strategies (SM, HH with R being build-side, HH with S being build-side). Each of these strategy combinations results in different execution performance depending on the data sizes and the available amount of working memory. In case of a small data set R and a much larger data set S, broadcasting R and using it as build-side input of a Hybrid-Hash-Join is usually a good choice because the much larger data set S is not shipped and not materialized (given that the hash table completely fits into memory). If both data sets are rather large or the join is performed on many parallel instances, repartitioning both inputs is a robust choice.

Flink features a cost-based optimizer which automatically chooses the execution strategies for all operators including joins. Without going into the details of cost-based optimization, this is done by computing cost estimates for execution plans with different strategies and picking the plan with the least estimated costs. Thereby, the optimizer estimates the amount of data which is shipped over the the network and written to disk. If no reliable size estimates for the input data can be obtained, the optimizer falls back to robust default choices. A key feature of the optimizer is to reason about existing data properties. For example, if the data of one input is already partitioned in a suitable way, the generated candidate plans will not repartition this input. Hence, the choice of a RR ship strategy becomes more likely. The same applies for previously sorted data and the Sort-Merge-Join strategy. Flink programs can help the optimizer to reason about existing data properties by providing semantic information about user-defined functions [4]. While the optimizer is a killer feature of Flink, it can happen that a user knows better than the optimizer how to execute a specific join. Similar to relational database systems, Flink offers optimizer hints to tell the optimizer which join strategies to pick [5].

How is Flink’s join performance?

Alright, that sounds good, but how fast are joins in Flink? Let’s have a look. We start with a benchmark of the single-core performance of Flink’s Hybrid-Hash-Join implementation and run a Flink program that executes a Hybrid-Hash-Join with parallelism 1. We run the program on a n1-standard-2 Google Compute Engine instance (2 vCPUs, 7.5GB memory) with two locally attached SSDs. We give 4GB as working memory to the join. The join program generates 1KB records for both inputs on-the-fly, i.e., the data is not read from disk. We run 1:N (Primary-Key/Foreign-Key) joins and generate the smaller input with unique Integer join keys and the larger input with randomly chosen Integer join keys that fall into the key range of the smaller input. Hence, each tuple of the larger side joins with exactly one tuple of the smaller side. The result of the join is immediately discarded. We vary the size of the build-side input from 1 million to 12 million elements (1GB to 12GB). The probe-side input is kept constant at 64 million elements (64GB). The following chart shows the average execution time of three runs for each setup.

The joins with 1 to 3 GB build side (blue bars) are pure in-memory joins. The other joins partially spill data to disk (4 to 12GB, orange bars). The results show that the performance of Flink’s Hybrid-Hash-Join remains stable as long as the hash table completely fits into memory. As soon as the hash table becomes larger than the working memory, parts of the hash table and corresponding parts of the probe side are spilled to disk. The chart shows that the performance of the Hybrid-Hash-Join gracefully decreases in this situation, i.e., there is no sharp increase in runtime when the join starts spilling. In combination with Flink’s robust memory management, this execution behavior gives smooth performance without the need for fine-grained, data-dependent memory tuning.

So, Flink’s Hybrid-Hash-Join implementation performs well on a single thread even for limited memory resources, but how good is Flink’s performance when joining larger data sets in a distributed setting? For the next experiment we compare the performance of the most common join strategy combinations, namely:

  • Broadcast-Forward, Hybrid-Hash-Join (broadcasting and building with the smaller side),
  • Repartition, Hybrid-Hash-Join (building with the smaller side), and
  • Repartition, Sort-Merge-Join

for different input size ratios:

  • 1GB : 1000GB
  • 10GB : 1000GB
  • 100GB : 1000GB
  • 1000GB : 1000GB

The Broadcast-Forward strategy is only executed for up to 10GB. Building a hash table from 100GB broadcasted data in 5GB working memory would result in spilling proximately 95GB (build input) + 950GB (probe input) in each parallel thread and require more than 8TB local disk storage on each machine.

As in the single-core benchmark, we run 1:N joins, generate the data on-the-fly, and immediately discard the result after the join. We run the benchmark on 10 n1-highmem-8 Google Compute Engine instances. Each instance is equipped with 8 cores, 52GB RAM, 40GB of which are configured as working memory (5GB per core), and one local SSD for spilling to disk. All benchmarks are performed using the same configuration, i.e., no fine tuning for the respective data sizes is done. The programs are executed with a parallelism of 80.

As expected, the Broadcast-Forward strategy performs best for very small inputs because the large probe side is not shipped over the network and is locally joined. However, when the size of the broadcasted side grows, two problems arise. First the amount of data which is shipped increases but also each parallel instance has to process the full broadcasted data set. The performance of both Repartitioning strategies behaves similar for growing input sizes which indicates that these strategies are mainly limited by the cost of the data transfer (at max 2TB are shipped over the network and joined). Although the Sort-Merge-Join strategy shows the worst performance all shown cases, it has a right to exist because it can nicely exploit sorted input data.

I’ve got sooo much data to join, do I really need to ship it?

We have seen that off-the-shelf distributed joins work really well in Flink. But what if your data is so huge that you do not want to shuffle it across your cluster? We recently added some features to Flink for specifying semantic properties (partitioning and sorting) on input splits and co-located reading of local input files. With these tools at hand, it is possible to join pre-partitioned data sets from your local filesystem without sending a single byte over your cluster’s network. If the input data is even pre-sorted, the join can be done as a Sort-Merge-Join without sorting, i.e., the join is essentially done on-the-fly. Exploiting co-location requires a very special setup though. Data needs to be stored on the local filesystem because HDFS does not feature data co-location and might move file blocks across data nodes. That means you need to take care of many things yourself which HDFS would have done for you, including replication to avoid data loss. On the other hand, performance gains of joining co-located and pre-sorted can be quite substantial.

tl;dr: What should I remember from all of this?

  • Flink’s fluent Scala and Java APIs make joins and other data transformations easy as cake.
  • The optimizer does the hard choices for you, but gives you control in case you know better.
  • Flink’s join implementations perform very good in-memory and gracefully degrade when going to disk.
  • Due to Flink’s robust memory management, there is no need for job- or data-specific memory tuning to avoid a nasty OutOfMemoryException. It just runs out-of-the-box.

References

[1] “MapReduce: Simplified data processing on large clusters”, Dean, Ghemawat, 2004
[2] Flink 0.8.1 documentation: Data Transformations
[3] Flink 0.8.1 documentation: Joins
[4] Flink 0.9-SNAPSHOT documentation: Semantic annotations
[5] Flink 0.9-SNAPSHOT documentation: Optimizer join hints


Written by Fabian Hueske (@fhueske).

Peeking into Apache Flink's Engine Room

February 2015 in the Flink community

02 Mar 2015

February might be the shortest month of the year, but this does not mean that the Flink community has not been busy adding features to the system and fixing bugs. Here’s a rundown of the activity in the Flink community last month.

0.8.1 release

Flink 0.8.1 was released. This bugfixing release resolves a total of 22 issues.

New committer

Max Michels has been voted a committer by the Flink PMC.

Apache SAMOA (incubating) is a distributed streaming machine learning (ML) framework with a programming abstraction for distributed streaming ML algorithms. SAMOA runs on a variety of backend engines, currently Apache Storm and Apache S4. A pull request is available at the SAMOA repository that adds a Flink adapter for SAMOA.

Flink is now integrated in bdutil, Google’s open source tool for creating and configuring (Hadoop) clusters in Google Compute Engine. Deployment of Flink clusters in now supported starting with bdutil 1.2.0.

A new blog post on Flink Streaming was published at the blog. Flink was mentioned in several articles on the web. Here are some examples:

The following features have been now merged in Flink’s master repository.

Gelly

Gelly, Flink’s Graph API allows users to manipulate graph-shaped data directly. Here’s for example a calculation of shortest paths in a graph:

Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);

DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
     .run(new SingleSourceShortestPaths<Long>(srcVertexId,
           maxIterations)).getVertices();

See more Gelly examples here.

The newly merged flink-expressions module is the first step in Flink’s roadmap towards logical queries and SQL support. Here’s a preview on how you can read two CSV file, assign a logical schema to, and apply transformations like filters and joins using logical attributes rather than physical data types.

val customers = getCustomerDataSet(env)
 .as('id, 'mktSegment)
 .filter( 'mktSegment === "AUTOMOBILE" )

val orders = getOrdersDataSet(env)
 .filter( o => dateFormat.parse(o.orderDate).before(date) )
 .as('orderId, 'custId, 'orderDate, 'shipPrio)

val items =
 orders.join(customers)
   .where('custId === 'id)
   .select('orderId, 'orderDate, 'shipPrio)

Access to HCatalog tables

With the flink-hcatalog module, you can now conveniently access HCatalog/Hive tables. The module supports projection (selection and order of fields) and partition filters.

Access to secured YARN clusters/HDFS.

With this change users can access Kerberos secured YARN (and HDFS) Hadoop clusters. Also, basic support for accessing secured HDFS with a standalone Flink setup is now available.

February 2015 in the Flink community

Introducing Flink Streaming

09 Feb 2015

This post is the first of a series of blog posts on Flink Streaming, the recent addition to Apache Flink that makes it possible to analyze continuous data sources in addition to static files. Flink Streaming uses the pipelined Flink engine to process data streams in real time and offers a new API including definition of flexible windows.

In this post, we go through an example that uses the Flink Streaming API to compute statistics on stock market data that arrive continuously and combine the stock market data with Twitter streams. See the Streaming Programming Guide for a detailed presentation of the Streaming API.

First, we read a bunch of stock price streams and combine them into one stream of market data. We apply several transformations on this market data stream, like rolling aggregations per stock. Then we emit price warning alerts when the prices are rapidly changing. Moving towards more advanced features, we compute rolling correlations between the market data streams and a Twitter stream with stock mentions.

For running the example implementation please use the 0.9-SNAPSHOT version of Flink as a dependency. The full example code base can be found here in Scala and here in Java7.

Back to top

Reading from multiple inputs

First, let us create the stream of stock prices:

  1. Read a socket stream of stock prices
  2. Parse the text in the stream to create a stream of StockPrice objects
  3. Add four other sources tagged with the stock symbol.
  4. Finally, merge the streams to create a unified stream.

Reading from multiple inputs

def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  //Read from a socket stream at map it to StockPrice objects
  val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
    val split = x.split(",")
    StockPrice(split(0), split(1).toDouble)
  })

  //Generate other stock streams
  val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
  val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
  val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
  val BUX_Stream = env.addSource(generateStock("BUX")(40) _)

  //Merge all stock streams together
  val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
    DJI_Stream, BUX_Stream)

  stockStream.print()

  env.execute("Stock stream")
}
public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    //Read from a socket stream at map it to StockPrice objects
    DataStream<StockPrice> socketStockStream = env
            .socketTextStream("localhost", 9999)
            .map(new MapFunction<String, StockPrice>() {
                private String[] tokens;

                @Override
                public StockPrice map(String value) throws Exception {
                    tokens = value.split(",");
                    return new StockPrice(tokens[0],
                        Double.parseDouble(tokens[1]));
                }
            });

    //Generate other stock streams
    DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
    DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
    DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
    DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));

    //Merge all stock streams together
    DataStream<StockPrice> stockStream = socketStockStream
        .merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);

    stockStream.print();

    env.execute("Stock stream");

See here on how you can create streaming sources for Flink Streaming programs. Flink, of course, has support for reading in streams from external sources such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake of this example, the data streams are simply generated using the generateStock method:

val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")

case class StockPrice(symbol: String, price: Double)

def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
  var price = 1000.
  while (true) {
    price = price + Random.nextGaussian * sigma
    out.collect(StockPrice(symbol, price))
    Thread.sleep(Random.nextInt(200))
  }
}
private static final ArrayList<String> SYMBOLS = new ArrayList<String>(
    Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));

public static class StockPrice implements Serializable {

    public String symbol;
    public Double price;

    public StockPrice() {
    }

    public StockPrice(String symbol, Double price) {
        this.symbol = symbol;
        this.price = price;
    }

    @Override
    public String toString() {
        return "StockPrice{" +
                "symbol='" + symbol + '\'' +
                ", count=" + price +
                '}';
    }
}

public final static class StockSource implements SourceFunction<StockPrice> {

    private Double price;
    private String symbol;
    private Integer sigma;

    public StockSource(String symbol, Integer sigma) {
        this.symbol = symbol;
        this.sigma = sigma;
    }

    @Override
    public void invoke(Collector<StockPrice> collector) throws Exception {
        price = DEFAULT_PRICE;
        Random random = new Random();

        while (true) {
            price = price + random.nextGaussian() * sigma;
            collector.collect(new StockPrice(symbol, price));
            Thread.sleep(random.nextInt(200));
        }
    }
}

To read from the text socket stream please make sure that you have a socket running. For the sake of the example executing the following command in a terminal does the job. You can get netcat here if it is not available on your machine.

nc -lk 9999

If we execute the program from our IDE we see the system the stock prices being generated:

INFO    Job execution switched to status RUNNING.
INFO    Socket Stream(1/1) switched to SCHEDULED 
INFO    Socket Stream(1/1) switched to DEPLOYING
INFO    Custom Source(1/1) switched to SCHEDULED 
INFO    Custom Source(1/1) switched to DEPLOYING
…
1> StockPrice{symbol='SPX', count=1011.3405732645239}
2> StockPrice{symbol='SPX', count=1018.3381290039248}
1> StockPrice{symbol='DJI', count=1036.7454894073978}
3> StockPrice{symbol='DJI', count=1135.1170217478427}
3> StockPrice{symbol='BUX', count=1053.667523187687}
4> StockPrice{symbol='BUX', count=1036.552601487263}

Back to top

Window aggregations

We first compute aggregations on time-based windows of the data. Flink provides flexible windowing semantics where windows can also be defined based on count of records or any custom user defined logic.

We partition our stream into windows of 10 seconds and slide the window every 5 seconds. We compute three statistics every 5 seconds. The first is the minimum price of all stocks, the second produces maximum price per stock, and the third is the mean stock price (using a map window function). Aggregations and groupings can be performed on named fields of POJOs, making the code more readable.

Basic windowing aggregations

//Define the desired time window
val windowedStream = stockStream
  .window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))

//Compute some simple statistics on a rolling window
val lowest = windowedStream.minBy("price")
val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _)

//Compute the mean of a window
def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
  if (ts.nonEmpty) {
    out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size))
  }
}
//Define the desired time window
WindowedDataStream<StockPrice> windowedStream = stockStream
    .window(Time.of(10, TimeUnit.SECONDS))
    .every(Time.of(5, TimeUnit.SECONDS));

//Compute some simple statistics on a rolling window
DataStream<StockPrice> lowest = windowedStream.minBy("price").flatten();
DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol")
    .maxBy("price").flatten();
DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol")
    .mapWindow(new WindowMean()).flatten();

//Compute the mean of a window
public final static class WindowMean implements 
    WindowMapFunction<StockPrice, StockPrice> {

    private Double sum = 0.0;
    private Integer count = 0;
    private String symbol = "";

    @Override
    public void mapWindow(Iterable<StockPrice> values, Collector<StockPrice> out) 
        throws Exception {

        if (values.iterator().hasNext()) {s
            for (StockPrice sp : values) {
                sum += sp.price;
                symbol = sp.symbol;
                count++;
            }
            out.collect(new StockPrice(symbol, sum / count));
        }
    }
}

Let us note that to print a windowed stream one has to flatten it first, thus getting rid of the windowing logic. For example execute maxByStock.flatten().print() to print the stream of maximum prices of the time windows by stock. For Scala flatten() is called implicitly when needed.

Back to top

Data-driven windows

The most interesting event in the stream is when the price of a stock is changing rapidly. We can send a warning when a stock price changes more than 5% since the last warning. To do that, we use a delta-based window providing a threshold on when the computation will be triggered, a function to compute the difference and a default value with which the first record is compared. We also create a Count data type to count the warnings every 30 seconds.

Data-driven windowing semantics

case class Count(symbol: String, count: Int)
val defaultPrice = StockPrice("", 1000)

//Use delta policy to create price change warnings
val priceWarnings = stockStream.groupBy("symbol")
  .window(Delta.of(0.05, priceChange, defaultPrice))
  .mapWindow(sendWarning _)

//Count the number of warnings every half a minute
val warningsPerStock = priceWarnings.map(Count(_, 1))
  .groupBy("symbol")
  .window(Time.of(30, SECONDS))
  .sum("count")

def priceChange(p1: StockPrice, p2: StockPrice): Double = {
  Math.abs(p1.price / p2.price - 1)
}

def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
  if (ts.nonEmpty) out.collect(ts.head.symbol)
}
private static final Double DEFAULT_PRICE = 1000.;
private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);

//Use delta policy to create price change warnings
DataStream<String> priceWarnings = stockStream.groupBy("symbol")
    .window(Delta.of(0.05, new DeltaFunction<StockPrice>() {
        @Override
        public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
            return Math.abs(oldDataPoint.price - newDataPoint.price);
        }
    }, DEFAULT_STOCK_PRICE))
.mapWindow(new SendWarning()).flatten();

//Count the number of warnings every half a minute
DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() {
    @Override
    public Count map(String value) throws Exception {
        return new Count(value, 1);
    }
}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();

public static class Count implements Serializable {
    public String symbol;
    public Integer count;

    public Count() {
    }

    public Count(String symbol, Integer count) {
        this.symbol = symbol;
        this.count = count;
    }

    @Override
    public String toString() {
        return "Count{" +
                "symbol='" + symbol + '\'' +
                ", count=" + count +
                '}';
    }
}

public static final class SendWarning implements MapWindowFunction<StockPrice, String> {
    @Override
    public void mapWindow(Iterable<StockPrice> values, Collector<String> out) 
        throws Exception {

        if (values.iterator().hasNext()) {
            out.collect(values.iterator().next().symbol);
        }
    }
}

Back to top

Combining with a Twitter stream

Next, we will read a Twitter stream and correlate it with our stock price stream. Flink has support for connecting to Twitter's API, but for the sake of this example we generate dummy tweet data.

Social media analytics

//Read a stream of tweets
val tweetStream = env.addSource(generateTweets _)

//Extract the stock symbols
val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
  .map(_.toUpperCase())
  .filter(symbols.contains(_))

//Count the extracted symbols
val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
  .groupBy("symbol")
  .window(Time.of(30, SECONDS))
  .sum("count")

def generateTweets(out: Collector[String]) = {
  while (true) {
    val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
    out.collect(s.mkString(" "))
    Thread.sleep(Random.nextInt(500))
  }
}
//Read a stream of tweets
DataStream<String> tweetStream = env.addSource(new TweetSource());

//Extract the stock symbols
DataStream<String> mentionedSymbols = tweetStream.flatMap(
    new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(word.toUpperCase());
        }
    }
}).filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String value) throws Exception {
        return SYMBOLS.contains(value);
    }
});

//Count the extracted symbols
DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String, Count>() {
    @Override
    public Count map(String value) throws Exception {
        return new Count(value, 1);
    }
}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();

public static final class TweetSource implements SourceFunction<String> {
    Random random;
    StringBuilder stringBuilder;

    @Override
    public void invoke(Collector<String> collector) throws Exception {
        random = new Random();
        stringBuilder = new StringBuilder();

        while (true) {
            stringBuilder.setLength(0);
            for (int i = 0; i < 3; i++) {
                stringBuilder.append(" ");
                stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
            }
            collector.collect(stringBuilder.toString());
            Thread.sleep(500);
        }

    }
}

Back to top

Streaming joins

Finally, we join real-time tweets and stock prices and compute a rolling correlation between the number of price warnings and the number of mentions of a given stock in the Twitter stream. As both of these data streams are potentially infinite, we apply the join on a 30-second window.

Streaming joins

//Join warnings and parsed tweets
val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
  .onWindow(30, SECONDS)
  .where("symbol")
  .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }

val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
  .mapWindow(computeCorrelation _)

rollingCorrelation print

//Compute rolling correlation
def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
  if (input.nonEmpty) {
    val var1 = input.map(_._1)
    val mean1 = average(var1)
    val var2 = input.map(_._2)
    val mean2 = average(var2)

    val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
    val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
    val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))

    out.collect(cov / (d1 * d2))
  }
}
//Join warnings and parsed tweets
DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock
    .join(tweetsPerStock)
    .onWindow(30, TimeUnit.SECONDS)
    .where("symbol")
    .equalTo("symbol")
    .with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() {
        @Override
        public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception {
            return new Tuple2<Integer, Integer>(first.count, second.count);
            }
    });

//Compute rolling correlation
DataStream<Double> rollingCorrelation = tweetsAndWarning
    .window(Time.of(30, TimeUnit.SECONDS))
    .mapWindow(new WindowCorrelation());

rollingCorrelation.print();

public static final class WindowCorrelation
    implements WindowMapFunction<Tuple2<Integer, Integer>, Double> {

    private Integer leftSum;
    private Integer rightSum;
    private Integer count;

    private Double leftMean;
    private Double rightMean;

    private Double cov;
    private Double leftSd;
    private Double rightSd;

    @Override
    public void mapWindow(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out) 
        throws Exception {

        leftSum = 0;
        rightSum = 0;
        count = 0;

        cov = 0.;
        leftSd = 0.;
        rightSd = 0.;

        //compute mean for both sides, save count
        for (Tuple2<Integer, Integer> pair : values) {
            leftSum += pair.f0;
            rightSum += pair.f1;
            count++;
        }

        leftMean = leftSum.doubleValue() / count;
        rightMean = rightSum.doubleValue() / count;

        //compute covariance & std. deviations
        for (Tuple2<Integer, Integer> pair : values) {
            cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count;
        }

        for (Tuple2<Integer, Integer> pair : values) {
            leftSd += Math.pow(pair.f0 - leftMean, 2) / count;
            rightSd += Math.pow(pair.f1 - rightMean, 2) / count;
        }
        leftSd = Math.sqrt(leftSd);
        rightSd = Math.sqrt(rightSd);

        out.collect(cov / (leftSd * rightSd));
    }
}

Back to top

Other things to try

For a full feature overview please check the Streaming Guide, which describes all the available API features. You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to contact us.

Upcoming for streaming

There are some aspects of Flink Streaming that are subjects to change by the next release making this application look even nicer.

Stay tuned for later blog posts on how Flink Streaming works internally, fault tolerance, and performance measurements!

Back to top

Introducing Flink Streaming

January 2015 in the Flink community

04 Feb 2015

Happy 2015! Here is a (hopefully digestible) summary of what happened last month in the Flink community.

0.8.0 release

Flink 0.8.0 was released. See here for the release notes.

The community has published a roadmap for 2015 on the Flink wiki. Check it out to see what is coming up in Flink, and pick up an issue to contribute!

Scaling ALS

Flink committers employed at data Artisans published a blog post on how they scaled matrix factorization with Flink and Google Compute Engine to matrices with 28 billion elements.

Articles in the press

The Apache Software Foundation announced Flink as a Top-Level Project. The announcement was picked up by the media, e.g., here, here, and here.

Hadoop Summit

A submitted abstract on Flink Streaming won the community vote at “The Future of Hadoop” track.

Meetups and talks

Flink was presented at the Paris Hadoop User Group, the Bay Area Hadoop User Group, the Apache Tez User Group, and FOSDEM 2015. The January Flink meetup in Berlin had talks on recent community updates and new features.

Notable code contributions

Note: Code contributions listed here may not be part of a release or even the Flink master repository yet.

Using off-heap memory

This pull request enables Flink to use off-heap memory for its internal memory uses (sort, hash, caching of intermediate data sets).

Gelly, Flink’s Graph API

This pull request introduces Gelly, Flink’s brand new Graph API. Gelly offers a native graph programming abstraction with functionality for vertex-centric programming, as well as available graph algorithms. See this slide set for an overview of Gelly.

Semantic annotations

Semantic annotations are a powerful mechanism to expose information about the behavior of Flink functions to Flink’s optimizer. The optimizer can leverage this information to generate more efficient execution plans. For example the output of a Reduce operator that groups on the second field of a tuple is still partitioned on that field if the Reduce function does not modify the value of the second field. By exposing this information to the optimizer, the optimizer can generate plans that avoid expensive data shuffling and reuse the partitioned output of Reduce. Semantic annotations can be defined for most data types, including (nested) tuples and POJOs. See the snapshot documentation for details (not online yet).

New YARN client

The improved YARN client of Flink now allows users to deploy Flink on YARN for executing a single job. Older versions only supported a long-running YARN session. The code of the YARN client has been refactored to provide an (internal) Java API for controlling YARN clusters more easily.

January 2015 in the Flink community

Apache Flink 0.8.0 available

21 Jan 2015

We are pleased to announce the availability of Flink 0.8.0. This release includes new user-facing features as well as performance and bug fixes, extends the support for filesystems and introduces the Scala API and flexible windowing semantics for Flink Streaming. A total of 33 people have contributed to this release, a big thanks to all of them!

Download Flink 0.8.0

See the release changelog

Overview of major new features

  • Extended filesystem support: The former DistributedFileSystem interface has been generalized to HadoopFileSystem now supporting all sub classes of org.apache.hadoop.fs.FileSystem. This allows users to use all file systems supported by Hadoop with Apache Flink. See connecting to other systems

  • Streaming Scala API: As an alternative to the existing Java API Streaming is now also programmable in Scala. The Java and Scala APIs have now the same syntax and transformations and will be kept from now on in sync in every future release.

  • Streaming windowing semantics: The new windowing api offers an expressive way to define custom logic for triggering the execution of a stream window and removing elements. The new features include out-of-the-box support for windows based in logical or physical time and data-driven properties on the events themselves among others. Read more here

  • Mutable and immutable objects in runtime All Flink versions before 0.8.0 were always passing the same objects to functions written by users. This is a common performance optimization, also used in other systems such as Hadoop. However, this is error-prone for new users because one has to carefully check that references to the object aren’t kept in the user function. Starting from 0.8.0, Flink allows to configure a mode which is disabling that mechanism.

  • Performance and usability improvements: The new Apache Flink 0.8.0 release brings several new features which will significantly improve the performance and the usability of the system. Amongst others, these features include:

    • Improved input split assignment which maximizes computation locality
    • Smart broadcasting mechanism which minimizes network I/O
    • Custom partitioners which let the user control how the data is partitioned within the cluster. This helps to prevent data skewness and allows to implement highly efficient algorithms.
    • coGroup operator now supports group sorting for its inputs
  • Kryo is the new fallback serializer: Apache Flink has a sophisticated type analysis and serialization framework that is able to handle commonly used types very efficiently. In addition to that, there is a fallback serializer for types which are not supported. Older versions of Flink used the reflective Avro serializer for that purpose. With this release, Flink is using the powerful Kryo and twitter-chill library for support of types such as Java Collections and Scala specifc types.

  • Hadoop 2.2.0+ is now the default Hadoop dependency: With Flink 0.8.0 we made the “hadoop2” build profile the default build for Flink. This means that all users using Hadoop 1 (0.2X or 1.2.X versions) have to specify version “0.8.0-hadoop1” in their pom files.

  • HBase module updated The HBase version has been updated to 0.98.6.1. Also, Hbase is now available to the Hadoop1 and Hadoop2 profile of Flink.

Contributors

  • Marton Balassi
  • Daniel Bali
  • Carsten Brandt
  • Moritz Borgmann
  • Stefan Bunk
  • Paris Carbone
  • Ufuk Celebi
  • Nils Engelbach
  • Stephan Ewen
  • Gyula Fora
  • Gabor Hermann
  • Fabian Hueske
  • Vasiliki Kalavri
  • Johannes Kirschnick
  • Aljoscha Krettek
  • Suneel Marthi
  • Robert Metzger
  • Felix Neutatz
  • Chiwan Park
  • Flavio Pompermaier
  • Mingliang Qi
  • Shiva Teja Reddy
  • Till Rohrmann
  • Henry Saputra
  • Kousuke Saruta
  • Chesney Schepler
  • Erich Schubert
  • Peter Szabo
  • Jonas Traub
  • Kostas Tzoumas
  • Timo Walther
  • Daniel Warneke
  • Chen Xu
Apache Flink 0.8.0 available

December 2014 in the Flink community

06 Jan 2015

This is the first blog post of a “newsletter” like series where we give a summary of the monthly activity in the Flink community. As the Flink project grows, this can serve as a "tl;dr" for people that are not following the Flink dev and user mailing lists, or those that are simply overwhelmed by the traffic.

The biggest news is that the Apache board approved Flink as a top-level Apache project! The Flink team is working closely with the Apache press team for an official announcement, so stay tuned for details!

The Flink website got a total make-over, both in terms of appearance and content.

A new IRC channel called #flink was created at irc.freenode.org. An easy way to access the IRC channel is through the web client. Feel free to stop by to ask anything or share your ideas about Apache Flink!

Meetups and Talks

Apache Flink was presented in the Amsterdam Hadoop User Group

Notable code contributions

Note: Code contributions listed here may not be part of a release or even the current snapshot yet.

Streaming Scala API

The Flink Streaming Java API recently got its Scala counterpart. Once merged, Flink Streaming users can use both Scala and Java for their development. The Flink Streaming Scala API is built as a thin layer on top of the Java API, making sure that the APIs are kept easily in sync.

Intermediate datasets

This pull request introduces a major change in the Flink runtime. Currently, the Flink runtime is based on the notion of operators that exchange data through channels. With the PR, intermediate data sets that are produced by operators become first-class citizens in the runtime. While this does not have any user-facing impact yet, it lays the groundwork for a slew of future features such as blocking execution, fine-grained fault-tolerance, and more efficient data sharing between cluster and client.

Configurable execution mode

This pull request allows the user to change the object-reuse behaviour. Before this pull request, some operations would reuse objects passed to the user function while others would always create new objects. This introduces a system wide switch and changes all operators to either reuse objects or don’t reuse objects.

Distributed Coordination via Akka

Another major change is a complete rewrite of the JobManager / TaskManager components in Scala. In addition to that, the old RPC service was replaced by Actors, using the Akka framework.

Sorting of very large records

Flink's internal sort-algorithms were improved to better handle large records (multiple 100s of megabytes or larger). Previously, the system did in some cases hold instances of multiple large records, resulting in high memory consumption and JVM heap thrashing. Through this fix, large records are streamed through the operators, reducing the memory consumption and GC pressure. The system now requires much less memory to support algorithms that work on such large records.

Kryo Serialization as the new default fallback

Flink’s build-in type serialization framework is handles all common types very efficiently. Prior versions uses Avro to serialize types that the built-in framework could not handle. Flink serialization system improved a lot over time and by now surpasses the capabilities of Avro in many cases. Kryo now serves as the default fallback serialization framework, supporting a much broader range of types.

Hadoop FileSystem support

This change permits users to use all file systems supported by Hadoop with Flink. In practice this means that users can use Flink with Tachyon, Google Cloud Storage (also out of the box Flink YARN support on Google Compute Cloud), FTP and all the other file system implementations for Hadoop.

Heading to the 0.8.0 release

The community is working hard together with the Apache infra team to migrate the Flink infrastructure to a top-level project. At the same time, the Flink community is working on the Flink 0.8.0 release which should be out very soon.

December 2014 in the Flink community

Hadoop Compatibility in Flink

18 Nov 2014

Apache Hadoop is an industry standard for scalable analytical data processing. Many data analysis applications have been implemented as Hadoop MapReduce jobs and run in clusters around the world. Apache Flink can be an alternative to MapReduce and improves it in many dimensions. Among other features, Flink provides much better performance and offers APIs in Java and Scala, which are very easy to use. Similar to Hadoop, Flink’s APIs provide interfaces for Mapper and Reducer functions, as well as Input- and OutputFormats along with many more operators. While being conceptually equivalent, Hadoop’s MapReduce and Flink’s interfaces for these functions are unfortunately not source compatible.

Flink’s Hadoop Compatibility Package

To close this gap, Flink provides a Hadoop Compatibility package to wrap functions implemented against Hadoop’s MapReduce interfaces and embed them in Flink programs. This package was developed as part of a Google Summer of Code 2014 project.

With the Hadoop Compatibility package, you can reuse all your Hadoop

  • InputFormats (mapred and mapreduce APIs)
  • OutputFormats (mapred and mapreduce APIs)
  • Mappers (mapred API)
  • Reducers (mapred API)

in Flink programs without changing a line of code. Moreover, Flink also natively supports all Hadoop data types (Writables and WritableComparable).

The following code snippet shows a simple Flink WordCount program that solely uses Hadoop data types, InputFormat, OutputFormat, Mapper, and Reducer functions.

// Definition of Hadoop Mapper function
public class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { ... }
// Definition of Hadoop Reducer function
public class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { ... }

public static void main(String[] args) {
  final String inputPath = args[0];
  final String outputPath = args[1];

  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  // Setup Hadoop’s TextInputFormat
  HadoopInputFormat<LongWritable, Text> hadoopInputFormat = 
      new HadoopInputFormat<LongWritable, Text>(
        new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
  TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

  // Read a DataSet with the Hadoop InputFormat
  DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
  DataSet<Tuple2<Text, LongWritable>> words = text
    // Wrap Tokenizer Mapper function
    .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
    .groupBy(0)
    // Wrap Counter Reducer function (used as Reducer and Combiner)
    .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
      new Counter(), new Counter()));

  // Setup Hadoop’s TextOutputFormat
  HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
    new HadoopOutputFormat<Text, LongWritable>(
      new TextOutputFormat<Text, LongWritable>(), new JobConf());
  hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
  TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));

  // Output & Execute
  words.output(hadoopOutputFormat);
  env.execute("Hadoop Compat WordCount");
}

As you can see, Flink represents Hadoop key-value pairs as Tuple2<key, value> tuples. Note, that the program uses Flink’s groupBy() transformation to group data on the key field (field 0 of the Tuple2<key, value>) before it is given to the Reducer function. At the moment, the compatibility package does not evaluate custom Hadoop partitioners, sorting comparators, or grouping comparators.

Hadoop functions can be used at any position within a Flink program and of course also be mixed with native Flink functions. This means that instead of assembling a workflow of Hadoop jobs in an external driver method or using a workflow scheduler such as Apache Oozie, you can implement an arbitrary complex Flink program consisting of multiple Hadoop Input- and OutputFormats, Mapper and Reducer functions. When executing such a Flink program, data will be pipelined between your Hadoop functions and will not be written to HDFS just for the purpose of data exchange.

What comes next?

While the Hadoop compatibility package is already very useful, we are currently working on a dedicated Hadoop Job operation to embed and execute Hadoop jobs as a whole in Flink programs, including their custom partitioning, sorting, and grouping code. With this feature, you will be able to chain multiple Hadoop jobs, mix them with Flink functions, and other operations such as Spargel operations (Pregel/Giraph-style jobs).

Summary

Flink lets you reuse a lot of the code you wrote for Hadoop MapReduce, including all data types, all Input- and OutputFormats, and Mapper and Reducers of the mapred-API. Hadoop functions can be used within Flink programs and mixed with all other Flink functions. Due to Flink’s pipelined execution, Hadoop functions can arbitrarily be assembled without data exchange via HDFS. Moreover, the Flink community is currently working on a dedicated Hadoop Job operation to supporting the execution of Hadoop jobs as a whole.

If you want to use Flink’s Hadoop compatibility package checkout our documentation.


Written by Fabian Hueske (@fhueske).

Hadoop Compatibility in Flink

Apache Flink 0.7.0 available

04 Nov 2014

We are pleased to announce the availability of Flink 0.7.0. This release includes new user-facing features as well as performance and bug fixes, brings the Scala and Java APIs in sync, and introduces Flink Streaming. A total of 34 people have contributed to this release, a big thanks to all of them!

Download Flink 0.7.0 here

See the release changelog here

Overview of major new features

Flink Streaming: The gem of the 0.7.0 release is undoubtedly Flink Streaming. Available currently in alpha, Flink Streaming provides a Java API on top of Apache Flink that can consume streaming data sources (e.g., from Apache Kafka, Apache Flume, and others) and process them in real time. A dedicated blog post on Flink Streaming and its performance is coming up here soon. You can check out the Streaming programming guide here.

New Scala API: The Scala API has been completely rewritten. The Java and Scala APIs have now the same syntax and transformations and will be kept from now on in sync in every future release. See the new Scala API here.

Logical key expressions: You can now specify grouping and joining keys with logical names for member variables of POJO data types. For example, you can join two data sets as persons.join(cities).where(“zip”).equalTo(“zipcode”). Read more here.

Hadoop MapReduce compatibility: You can run unmodified Hadoop Mappers and Reducers (mapred API) in Flink, use all Hadoop data types, and read data with all Hadoop InputFormats.

Collection-based execution backend: The collection-based execution backend enables you to execute a Flink job as a simple Java collections program, bypassing completely the Flink runtime and optimizer. This feature is extremely useful for prototyping, and embedding Flink jobs in projects in a very lightweight manner.

Record API deprecated: The (old) Stratosphere Record API has been marked as deprecated and is planned for removal in the 0.9.0 release.

BLOB service: This release contains a new service to distribute jar files and other binary data among the JobManager, TaskManagers and the client.

Intermediate data sets: A major rewrite of the system internals introduces intermediate data sets as first class citizens. The internal state machine that tracks the distributed tasks has also been completely rewritten for scalability. While this is not visible as a user-facing feature yet, it is the foundation for several upcoming exciting features.

Note: Currently, there is limited support for Java 8 lambdas when compiling and running from an IDE. The problem is due to type erasure and whether Java compilers retain type information. We are currently working with the Eclipse and OpenJDK communities to resolve this.

Contributors

  • Tamas Ambrus
  • Mariem Ayadi
  • Marton Balassi
  • Daniel Bali
  • Ufuk Celebi
  • Hung Chang
  • David Eszes
  • Stephan Ewen
  • Judit Feher
  • Gyula Fora
  • Gabor Hermann
  • Fabian Hueske
  • Vasiliki Kalavri
  • Kristof Kovacs
  • Aljoscha Krettek
  • Sebastian Kruse
  • Sebastian Kunert
  • Matyas Manninger
  • Robert Metzger
  • Mingliang Qi
  • Till Rohrmann
  • Henry Saputra
  • Chesnay Schelper
  • Moritz Schubotz
  • Hung Sendoh Chang
  • Peter Szabo
  • Jonas Traub
  • Fabian Tschirschnitz
  • Artem Tsikiridis
  • Kostas Tzoumas
  • Timo Walther
  • Daniel Warneke
  • Tobias Wiens
  • Yingjun Wu
Apache Flink 0.7.0 available