//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Abstract [abstract] ************************************************** Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. The system is centrally managed and allows for intelligent dynamic management. It uses a simple extensible data model that allows for online analytic applications. ************************************************** [[Introduction]] == Introduction Flume is a distributed, reliable, and available service for efficiently moving large amounts of data soon after the data is produced. This release provides a scalable conduit to move data around a cluster as well as reliable logging. The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as the Hadoop Distributed File System (HDFS). The system was designed with these four key goals in mind: * Reliability * Scalability * Manageability * Extensibility This section provides a high-level overview of Flume's architecture and describes how the four design goals are achieved. === Architecture Flume's architecture is simple, robust, and flexible. The main abstraction in Flume is a stream-oriented *data flow*. A data flow describes the way a single stream of data is transferred and processed from its point of generation to its eventual destination. Data flows are composed of *logical nodes* that can transform or aggregate the events they receive. Logical nodes are wired together in chains to form a data flow. The way in which they are wired is called the logical node's *configuration*. Controlling all this is the *Flume Master*, which is a separate service with knowledge of all the physical and logical nodes in a Flume installation. The Master assigns configurations to logical nodes, and is responsible for communicating configuration updates by the user to all logical nodes. In turn, the logical nodes periodically contact the master so they can share monitoring information and check for updates to their configuration. ["graphviz", "architecture.png"] --------------------------------------------------------------------- digraph multi_collector { rankdir=LR; node [shape=none] "agent tier" -> "collector tier" -> "storage tier" node [shape=record]; { rank=same; "agent tier"; "agent A"; "agent B"; "agent C"; "agent D"; "agent E"; "agent F"; } { rank=same; "storage tier"; "master"; "HDFS" } { rank=same; "collector tier"; "collector" } collector [label=" collector A | collector B | collector C"]; master -> "agent A" [style="dashed" arrowhead="none"] ; master -> "agent B" [style="dashed" arrowhead="none"] ; master -> "agent C" [style="dashed" arrowhead="none"] ; master -> "agent D" [style="dashed" arrowhead="none"] ; master -> "agent E" [style="dashed" arrowhead="none"] ; master -> "agent F" [style="dashed" arrowhead="none"] ; master -> collector:A [style="dashed" arrowhead="none"] ; master -> collector:B [style="dashed" arrowhead="none"] ; master -> collector:C [style="dashed" arrowhead="none"] ; collector:A -> HDFS [style="bold"]; collector:B -> HDFS [style="bold"]; collector:C -> HDFS [style="bold"]; "agent A" -> collector:A [style="bold"]; "agent B" -> collector:A [style="bold"]; "agent C" -> collector:B [style="bold"]; "agent D" -> collector:B [style="bold"]; "agent E" -> collector:C [style="bold"]; "agent F" -> collector:C [style="bold"]; } --------------------------------------------------------------------- // I think this should just use node here and introduce logical nodes // and physical nodes later The graph above shows a typical deployment of Flume that collects log data from a set of application servers. The deployment consists of a number of _logical nodes_, arranged into three tiers. The first tier is the _agent_ tier. Agent nodes are typically installed on the machines that generate the logs and are your data's initial point of contact with Flume. They forward data to the next tier of _collector nodes_, which aggregate the separate data flows and forward them to the final _storage tier_. For example, the agents could be machines listening for +syslog+ data or monitoring the logs of a service such as a web server or the Hadoop JobTracker. The agents produce streams of data that are sent to the collectors; the collectors then aggregate the streams into larger streams which can be written efficiently to a storage tier such as HDFS. Logical nodes are a very flexible abstraction. Every logical node has just two components - a _source_ and a _sink_. The source tells a logical node where to collect data, and the sink tells it where to send the data. The only difference between two logical nodes is how the source and sink are configured. Both source and sink can additionally be configured with _decorators_ which perform some simple processing on data as it passes through. In the previous example, the collector and the agents are _running the same node software_. The Master assigns a configuration to each logical node at run-time - all components of a node's configuration are instantiated dynamically at run-time, and therefore configurations can be changed many times throughout the lifetime of a Flume service without having to restart any Java processes or log into the machines themselves. In fact, logical nodes themselves can be created and deleted dynamically. The source, sink, and optional decorators are a powerful set of primitives. Flume uses this architecture to provide per-flow data properties (for example durability guarantees, compression, or batching), or to compute event metadata, or even generate new events that are inserted into data flow. A logical node can also send data downstream to several logical nodes. This allows for multiple flows and each subflow can potentially be configured differently. For example, it’s possible to have one flow be a collection path, delivering data reliably to a persistent store, while another branch computes lightweight analytics to be delivered to an alerting system. // I think this box is too early. .Logical and Physical Nodes *********************** It's important to make the distinction between _logical nodes_ and _physical nodes_. A physical node corresponds to a single Java process running on one machine in a single JVM instance. Usually there is just one physical node per machine. Physical nodes act as containers for _logical nodes_, which are wired together to form data flows. Each physical node can play host to many logical nodes, and takes care of arbitrating the assignment of machine resources between them. So, although the agents and the collectors in the preceding example are _logically_ separate processes, they could be running on the same _physical_ node. Flume gives users the flexibility to specify where the computation and data transfer are done. The remainder of this guide describes logical nodes unless stated otherwise. *********************** === Reliability Reliability, the ability to continue delivering events in the face of failures without losing data, is a vital feature of Flume. Large distributed systems can and do suffer partial failures in many ways - physical hardware can fail, resources such as network bandwidth or memory can become scarce, or software can crash or run slowly. Flume emphasizes fault-tolerance as a core design principle and keeps running and collecting data even when many components have failed. Flume can guarantee that all data received by an agent node will eventually make it to the collector at the end of its flow as long as the agent node keeps running. That is, data can be *reliably* delivered to its eventual destination. However, reliable delivery can be very resource intensive and is often a stronger guarantee than some data sources require. Therefore, Flume allows the user to specify, on a per-flow basis, the level of reliability required. There are three supported reliability levels: * End-to-end * Store on failure * Best effort .A Note About Reliability ****************** Although Flume is extremely tolerant to machine, network, and software failures, there is never any such thing as '100% reliability'. If all the machines in a Flume installation were irrevocably destroyed in some terrible data center incident, all copies of Flume's data would be lost and there would be no way to recover them. Therefore all of Flume's reliability levels make guarantees about data delivery 'until some maximum number of failures have occurred'. Flume's failure modes - in terms of what can fail and what will keep running if they do - are described in detail later in this guide. ****************** The *end-to-end* reliability level guarantees that once Flume accepts an event, that event will make it to the endpoint - as long as the agent that accepted the event remains live long enough. The first thing the agent does in this setting is write the event to disk in a ''write-ahead log'' (WAL) so that, if the agent crashes and restarts, knowledge of the event is not lost. After the event has successfully made its way to the end of its flow, an acknowledgment is sent back to the originating agent so that it knows it no longer needs to store the event on disk. This reliability level can withstand any number of failures downstream of the initial agent. The *store on failure* reliability level causes nodes to only require an acknowledgement from the node one hop downstream. If the sending node detects a failure, it will store data on its local disk until the downstream node is repaired, or an alternate downstream destination can be selected. While this is effective, data can be lost if a compound or silent failure occurs. The *best-effort* reliability level sends data to the next hop with no attempts to confirm or retry delivery. If nodes fail, any data that they were in the process of transmitting or receiving can be lost. This is the weakest reliability level, but also the most lightweight. .Eliminating Single Points of Failure ************************ Even when using the end-to-end reliability level, a Flume flow can fail to make progress if there aren't any nodes available to process events. The data will continue to be collected by the agents, but the agents may not be able to deliver the data downstream until a suitable node comes online. In order to maintain high availability, Flume allows flows to fail over from one downstream node to another without any user intervention. The Flume Master can also be replicated for high availability which means that both the data plane and the control plane can tolerate a number of faults. ************************ === Scalability Scalability is the ability to increase system performance linearly - or better - by adding more resources to the system. Flume's goal is horizontal scalability -- the ability to incrementally add more machines to the system to increase throughput. A key performance measure in Flume is the number or size of events entering the system and being delivered. When load increases, it is simple to add more resources to the system in the form of more machines to handle the increased load. As seen in the preceding example installation, there are three separate components of Flume that require different approaches to scalability: the collector tier, the master, and the storage tier. The collector tier needs to be able to scale in order to handle large volumes of data coming from large numbers of agent nodes. This workload is write heavy, partitionable, and thus parallelizable. By adding more machines to the collector tier, you can increase the number of agents and the maximum available throughput of the system. An individual collector can usually handle many agents (up to hundreds) because each individual agent often produces only small amounts of log data compared to the full bandwidth available to the collector. Therefore, Flume balances flows from agents across different collectors. (One flow from an agent will talk to the same collector.) Flume uses a randomized algorithm to evenly assign lists of collectors to flows. This automatically spreads the load, and also keeps the load spread in the case where a collector fails. As the number of nodes in the system increases, the volume of traffic on the control path to and from the Flume Master may become a bottleneck. The Flume Master also supports horizontal scaling by adding more machines - although just a small number of commodity servers can serve a large installation of nodes. The state of the Flume Master is kept synchronized and fully replicated, which ensures that it is both fault tolerant and highly scalable. Finally, Flume can only write data through a flow at the rate that the final destinations can accept. Although Flume is able to buffer data inside a flow to smooth out high-volume bursts, the output rate needs to be equal on average to the input rate to avoid log jams. Thus, writing to a scalable storage tier is advisable. For example, HDFS has been shown to scale to thousands of machines and can handle many petabytes of data. === Manageability Manageability is the ability to control data flows, monitor nodes, modify settings, and control outputs of a large system. Manually managing the data flow from the sources to the end point is tedious, error prone, and a major pain point. With the potential to have thousands of log-generating applications and services, it's important to have a centralized management point to monitor and change data flows, and the ability to dynamically handle different conditions or problems. The Flume Master is the point where global state such as the data flows can be managed. Via the Flume Master, users can monitor flows and reconfigure them on the fly. The Flume Master has the information required to automatically respond to system changes such as load imbalances, partial failures, or newly provisioned hardware. You can dynamically reconfigure nodes by using the Flume Master. Although this guide describes examples of traditional three-tier deployments, the flexibility of the nodes allow for arbitrary node topologies. You can reconfigure nodes by using small scripts written in a flexible dataflow specification language, which can be submitted via the Flume Master interface. You can administer the Flume Master by using either of two interfaces: a web interface or the scriptable Flume command shell. The web interface provides interactive updates of the system’s state. The shell enables administration via manually crafted scripts or machine-generated scripts. === Extensibility Extensibility is the ability to add new functionality to a system. For example, you can extend Flume by adding connectors to existing storage layers or data platforms. This is made possible by simple interfaces, separation of functional concerns into simple composable pieces, a flow specification language, and a simple but flexible data model. Flume provides many common input and output connectors. When new input connectors (sources) are added, extra metadata fields specific to that source can be attached to each event it produces. Flume reuses the common components that provide particular reliability and resource usage properties. Some general sources include files from the file system, syslog and syslog-ng emulation, or the standard output of a process. More specific sources such as IRC channels and Twitter streams can also be added. Similarly, there are many output destinations for events. Although HDFS is the primary output destination, events can be sent to local files, or to monitoring and alerting applications such as Ganglia or communication channels such as IRC. To enable easy integration with HDFS, MapReduce, and Hive, Flume provides simple mechanisms for output file management and output format management. Data gathered by Flume can be processed easily with Hadoop and Hive. === Section summary The preceding Introduction section describes the high level goals and features of Flume. The following sections of this guide describe how to set up and use Flume: * Step-by-step tutorial that introduces a single Flume node * Introduction to the Flume Master and a pseudo-distributed mode that includes multiple nodes coordinated by the Flume Master * Description of a fully-distributed setup that also removes single points of failure * Flume use cases and a description of how to integrate Flume with existing sources of data * How to set up Flume's output so that integration with heavyweight analysis systems such as Hadoop and Hive * How to deploy Flume, set up arbitrary flows, and a specification of Flume's data flow specification language * Catalog of components available via the language * A description of experimental features * Troubleshooting information