//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Fully-distributed Mode The main goal for Flume is to collect logs and data from many different hosts and to scale and intelligently handle different cluster and network topologies. To deploy Flume on your cluster, do the following steps. .Steps to Deploy Flume On a Cluster * Install Flume on each machine. * Select one or more nodes to be the Master. * Modify a static configuration file to use site specific properties. * Start the Flume Master node on at least *one* machine. * Start a Flume node on *each* machine. The following section describes how to manually configure the properties file to specify the Master for each node, and how to set default values for parameters. Sections afterwards describe a data flow configuration for a larger system, how to add more capacity by adding collectors, and how to improve the reliability of the Master by adding multiple Masters. === Static Configuration Files In the previous sections, you used Flume on a single machine with the default configuration settings. With the default settings, nodes automatically search for a Master on `localhost` on a standard port. In order for the Flume nodes to find the Master in a fully distributed setup, you must specify site-specific static configuration settings. Site-specific settings for Flume nodes and Masters are configured by properties in the `conf/flume-site.xml` file found on each machine. If this file is not present, the commands default to the settings found in `conf/flume-conf.xml`. In the following example, you set up the property that points a Flume node to search for its Master at a machine called `master`. // xml source code highlighting -- requires source-highlight library. .`conf/flume-site.xml` [source,xml] ------------------------------------------------------------------------------------ flume.master.servers master ------------------------------------------------------------------------------------ ==== Using Default Values When you are using agent/collector roles, you can add the following configuration properties to your +flume-site.xml+ file to set up the default hosts used as collector. [source,xml] ------------------------------------------------------------------------------------ ... flume.collector.event.host collector This is the host name of the default "remote" collector. flume.collector.port 35853 This default tcp port that the collector listens to in order to receive events it is collecting. ... ------------------------------------------------------------------------------------ This will make the `agentSink` with no arguments default to using `flume.collector.event.host` and `flume.collector.port` for their default target and port. In the following example, a larger setup with several agents push data to a collector. There are seven Flume nodes -- six in the agent tier, and one in the collector tier. ["graphviz", "singleCollector.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; node [shape=record]; "agent A" -> collector -> HDFS; "agent B" -> collector; "agent C" -> collector; "agent D" -> collector; "agent E" -> collector; "agent F" -> collector; node [shape=none] "agent tier" -> "collector tier" -> "storage tier" } --------------------------------------------------------------------- An explicit configuration fills in all of the parameters: ---- agentA : src | agentSink("collector",35853); agentB : src | agentSink("collector",35853); agentC : src | agentSink("collector",35853); agentD : src | agentSink("collector",35853); agentE : src | agentSink("collector",35853); agentF : src | agentSink("collector",35853); collector : collectorSource(35853) | collectorSink("hdfs://namenode/flume/","srcdata"); ---- NOTE: When specifying destinations for agentSinks, use the _hostname and port of the target machine_. The default name for a node is its hostname. However, if there are multiple logical nodes, you must use the machine's host name, _not the name of the logical node_. In the preceding examples, agent[A-F] and collector are the _physical host names_ of the machines where these configurations are running. You can rely on the default ports set in the configuration files: ---- agentA : src | agentSink("collector"); agentB : src | agentSink("collector"); agentC : src | agentSink("collector"); agentD : src | agentSink("collector"); agentE : src | agentSink("collector"); agentF : src | agentSink("collector"); collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata"); ---- You can rely on the default ports and default collector host: ---- agentA : src | agentSink agentB : src | agentSink agentC : src | agentSink agentD : src | agentSink agentE : src | agentSink agentF : src | agentSink collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata"); ---- WARNING: Using defaults can make writing data flow configurations more concise, but may obscure the details about how different nodes are connected to each other. .Reliability Modes **************** You can tune the reliability level of the agents, simply by specifying a different kind of agent sink. There are three levels available, and three corresponding agents: +agentE2ESink[("_machine_"[,_port_])]+ :: End to end. This version uses the WAL, relies on an acknowledgement, and will retry if no acknowledgement is received. +agentDFOSink[("_machine_"[,_port_])]+ :: Disk Failover (store on failure). This agent writes to disk only if it can detect a failure on the collector. If any data is stored to disk, it periodically retires the network connection and attempts to resend. +agentBESink[("_machine_"[,_port_])]+ :: Best Effort. This agent does not write to disk at all, and drops messages in the event of collector failures. The previous examples use the +agentSink+. This is an alias for the +agentE2ESink+. **************** === Multiple Collectors Having multiple collectors can increase the log collection throughput and can improve the timeliness of event delivery by increasing collector availability. Data collection is parallelizable; thus, load from many agents can be shared across many several collectors. ==== Partitioning Agents across Multiple Collectors The preceding graph and dataflow spec shows a typical topology for Flume nodes. For reliable delivery, in the event that the collector stops operating or disconnects from the agents, the agents would need to store their events to their respective disks locally. The agents would then periodically attempt to recontact a collector. Because the collector is down, any analysis or processing downstream is blocked. ["graphviz", "multiCollector.png"] --------------------------------------------------------------------- digraph multi_collector { rankdir=LR; node [shape=record]; collector [label=" collector A | collector B | collector C"]; collector:A -> HDFS; collector:B -> HDFS; collector:C -> HDFS; "agent A" -> collector:A; "agent B" -> collector:A; "agent C" -> collector:B; "agent D" -> collector:B; "agent E" -> collector:C; "agent F" -> collector:C; node [shape=none] "agent tier" -> "collector tier" -> "storage tier" } --------------------------------------------------------------------- When you have multiple collectors as in the preceding graph and dataflow spec, downstream progress is still made even in the face of a collector's failure. If collector B goes down, agent A, agent B, agent E, and agent F continue to deliver events via collector A and collector C respectively. Agent C and agent D may have to queue their logs until collector B (or its replacement) comes back online. The following configuration partitions the work from the set of agents across many collectors. In this example, each of the collectors specify the same DFS output directory and file prefixes, aggregating all of the logs into the same directory. ---- agentA : src | agentE2ESink("collectorA",35853); agentB : src | agentE2ESink("collectorA",35853); agentC : src | agentE2ESink("collectorB",35853); agentD : src | agentE2ESink("collectorB",35853); agentE : src | agentE2ESink("collectorC",35853); agentF : src | agentE2ESink("collectorC",35853); collectorA : collectorSource(35853) | collectorSink("hdfs://...","src"); collectorB : collectorSource(35853) | collectorSink("hdfs://...","src"); collectorC : collectorSource(35853) | collectorSink("hdfs://...","src"); ---- ==== Manually Specifying Failover Chains ["graphviz", "failoverCollector.png"] --------------------------------------------------------------------- digraph multi_collector { rankdir=LR; node [shape=record]; collector [label=" collector A | collector B | collector C"]; collector:A -> HDFS; collector:B -> HDFS [ style="dotted" label = "down" ]; collector:C -> HDFS; "agent A" -> collector:A; "agent B" -> collector:A; "agent C" -> collector:B [ style="dotted" label = "down" ]; "agent D" -> collector:B [ style="dotted" label = "down" ]; "agent C" -> collector:A; "agent D" -> collector:C; "agent E" -> collector:C; "agent F" -> collector:C; node [shape=none]; "agent tier" -> "collector tier" -> "storage tier" } --------------------------------------------------------------------- When you have multiple collectors writing to the same storage location, instead of having agent C and agent D queue indefinitely, you can instead have them fail over to other collectors. In this scenario, you could have agent C and agent D fail over to collector A and collector C respectively, while periodically checking to see if collector B has returned. To specify these setups, use agents with *failover chains*. Similarly to single collector agents, there are three levels of reliability for the failover chain agents: +agentE2EChain+, +agentDFOChain+, and +agentBEChain+. In the following example, you manually specify the failover chain using +agentE2EChain+, an agent with end-to-end reliability with multiple failover collectors. +agentA+ in this situation will initially attempt to send to +collectorA+ on port +35853+. The second argument in +agentA+ 's sink specifies the collector to fall back onto if the first fails. You can specify an arbitrary number of collectors, but you must specify at least one. ---- agentA : src | agentE2EChain("collectorA:35853","collectorB:35853"); agentB : src | agentE2EChain("collectorA:35853","collectorC:35853"); agentC : src | agentE2EChain("collectorB:35853","collectorA:35853"); agentD : src | agentE2EChain("collectorB:35853","collectorC:35853"); agentE : src | agentE2EChain("collectorC:35853","collectorA:35853"); agentF : src | agentE2EChain("collectorC:35853","collectorB:35853"); collectorA : collectorSource(35853) | collectorSink("hdfs://...","src"); collectorB : collectorSource(35853) | collectorSink("hdfs://...","src"); collectorC : collectorSource(35853) | collectorSink("hdfs://...","src"); ---- NOTE: In this section, +agent[A-F]+ and +collector[A-B]+ are physical host names. As in the single collector case, if no port number is specified, the agent defaults to using the +flume.collector.port+. ---- agentA : src | agentE2EChain("collectorA","collectorB"); agentB : src | agentE2EChain("collectorA","collectorC"); agentC : src | agentE2EChain("collectorB","collectorA"); agentD : src | agentE2EChain("collectorB","collectorC"); agentE : src | agentE2EChain("collectorC","collectorA"); agentF : src | agentE2EChain("collectorC","collectorB"); collectorA : collectorSource | collectorSink("hdfs://...","src"); collectorB : collectorSource | collectorSink("hdfs://...","src"); collectorC : collectorSource | collectorSink("hdfs://...","src"); ---- .Reliability and failover chain semantics **************** * The +agentE2EChain+ always _writes to WAL first_ and then attempts to send to the different collectors. In the event of a collector failure, it fails over to another collector. * The +agentDFOChain+ always attempts to send data to the different collectors, and _only writes to disk if all collectors fail_. * The +agentBEChain+ attempts to send message to each collector in succession in the event of failures. If all collectors fail, it drops messages. **************** ==== Automatic Failover Chains WARNING: The automatic failover chain feature does not currently work when using multiple masters. Flume also provides a mechanism that automatically assigns failover chains based on how nodes are configured. As collector nodes are assigned in the Flume Master, the Master attempts to distribute the agents evenly amongst the collectors. In the face of failure, each agent is assigned a different failover chain. This mitigates the chances of another collector becoming overloaded in the event of failure of a collector. To specify a node to use the failover chains, use either the +autoE2EChain+, +autoDFOChain+, or +autoBEChain+ agent sink. Because the Master calculates the failover chains, these sinks take no explicit arguments. ---- agentA : src | autoE2EChain ; agentB : src | autoE2EChain ; agentC : src | autoE2EChain ; agentD : src | autoE2EChain ; agentE : src | autoE2EChain ; agentF : src | autoE2EChain ; collectorA : autoCollectorSource | collectorSink("hdfs://...", "src"); collectorB : autoCollectorSource | collectorSink("hdfs://...", "src"); collectorC : autoCollectorSource | collectorSink("hdfs://...", "src"); ---- The Master updates the configuration of the agents based on the current collectors in the system. When new collectors are added to the system, the Master updates the failover chains of agents to rebalance. NOTE: If the Master has no nodes with +autoCollectorSource+ as its source, the agent's automatic chains will report a +fail("...")+ chain which will wait for +autoCollectorSource+ s to be specified. If the nodes are not mapped, they will report a different +fail+ sink notifying you that the node is unmapped (isn't associated with a host/port). TIP: You can see the translation of the auto*Chain configuration in the node configuration table under the translated configuration column. This is a little declarative specification of the failure recovery behavior of the sink. More details on this are in the Advanced section of this guide, and in future revisions the translations for the agents and other chains will also be presented. include::LogicalNodes[] === Multiple Masters WARNING: The automatic failover chains, automatic flow isolation, and logical source/sink feature does not currently work when using multiple masters. The Master has two main jobs to perform. The first is to keep track of all the nodes in a Flume deployment and to keep them informed of any changes to their configuration. The second is to track acknowledgements from the end of a Flume flow that is operating in *reliable mode* so that the source at the top of that flow knows when to stop transmitting an event. Both these jobs are critical to the operation of a Flume deployment. Therefore, it is ill-advised to have the Master live on a single machine, as this represents a single point of failure for the whole Flume service (see 'failure modes' for more detail). Flume therefore supports the notion of multiple Masters which run on physically separate nodes and co-ordinate amongst themselves to stay synchronized. If a single Master should fail, the other Masters can take over its duties and keep all live flows functioning. This all happens transparently with a little effort at configuration time. Nodes will automatically fail over to a working Master when they lose contact with their current Master. ==== Standalone Mode Compared to Distributed Mode The Flume Master can be run in one of two ways. - *Standalone* mode - this is where the Master runs on a single machine. This is easy to administer, and simple to set-up, but has disadvantages when it comes to scalability and fault-tolerance. - *Distributed* mode - this is where the Master is configured to run on several machines - usually three or five. This option scales to serve many Flows, and also has good fault-tolerance properties. Large production deployments of Flume should run a distributed Master so that inevitable machine failures do not impact the availability of Flume itself. For small deployments the issue is less clear-cut - a distributed Master means reserving more computing resources that could be used instead for nodes or other services, and it is possible to recover from many failure modes in a timely manner with human intervention. The choice between distributed and standalone Masters is ultimately dependent both on your use case and your operating requirements. ==== Running in Standalone Mode ********************************* Standalone mode is where Flume has only one Master node. The configuration described below needs to be done on that machine only. ********************************* Whether the Flume Master starts in distributed or standalone mode is indirectly controlled by how many machines are configured to run as Master servers. To run in standalone mode, a single configuration property +flume.master.servers+ must be set: --------------------------------------- flume.master.servers hostA --------------------------------------- The value of +flume.master.servers+ is a comma-separated list of all the machine names (or IP addresses) that will be Master servers. If this list contains only machine name, the Flume Master will start in standalone mode. If there is more than one machine name in the list, the Flume Master will start in distributed mode. There's no other configuration required for standalone mode. Flume will use reasonable default values for any other master-related variables. To start the Master, from the command prompt type: ----------------------- $ flume master ----------------------- from +$FLUME_HOME+. A number of log messages should print to the screen. After the server is running, you can check that everything is working properly by visiting the web interface at +http://master-node-ip:35871/+, where +master-node-ip+ is the IP address (or hostname) of the Master node. If you see a web page, the Master is running. ==== Running in Distributed Mode ************************** Distributed mode runs the Flume Master on several machines. Therefore the configuration described below should be done on *every* Master machine, except where noted. ************************** Running the Flume Master in distributed mode provides better fault tolerance than in standalone mode, and scalability for hundreds of nodes. Configuring machines to run as part of a distributed Flume Master is nearly as simple as standalone mode. As before, +flume.master.servers+ needs to be set, this time to a list of machines: --------------------------------------- flume.master.servers masterA,masterB,masterC --------------------------------------- .How many machines do I need? ****************************** The distributed Flume Master will continue to work correctly as long as more than half the physical machines running it are still working and haven't crashed. Therefore if you want to survive one fault, you need three machines (because 3-1 = 2 > 3/2). For every extra fault you want to tolerate, add another two machines, so for two faults you need five machines. Note that having an even number of machines doesn't make the Flume Master any more fault-tolerant - four machines only tolerate one failure, because if two were to fail only two would be left functioning, which is not more than half of four. Common deployments should be well served by three or five machines. ****************************** The final property to set is *not* the same on every machine - every node in the Flume Master must have a unique value for +flume.master.serverid+. .Note ***************************** +flume.master.serverid+ is the only Flume Master property that _must_ be different on every machine in the ensemble. ***************************** .masterA [source,xml] -------------------------------------- flume.master.serverid 0 -------------------------------------- .masterB [source,xml] -------------------------------------- flume.master.serverid 1 -------------------------------------- .masterC [source,xml] -------------------------------------- flume.master.serverid 2 -------------------------------------- The value for +flume.master.serverid+ for each node is the index of that node's hostname in the list in +flume.master.ensemble+, starting at 0. For example +masterB+ has index 1 in that list. The purpose of this property is to allow each node to uniquely identify itself to the other nodes in the Flume Master. This is all the configuration required to start a three-node distributed Flume Master. To test this out, we can start the Master process on all three machines: -------------------------------- [flume@masterA] flume master [flume@masterB] flume master [flume@masterC] flume master -------------------------------- Each Master process will initially try and contact all other nodes in the ensemble. Until more than half (in this case, two) nodes are alive and contactable, the configuration store will be unable to start, and the Flume Master will not be able to read or write configuration data. You can check the current state of the ensemble by inspecting the web page for any of the Flume Master machines which by default will be found at, for example, +http://masterA:35871+. ==== Configuration Stores The Flume Master stores all its data in a *configuration store*. Flume has a pluggable configuration store architecture, and supports two implementations. - The Memory-Backed Config Store (MBCS) stores configurations temporarily in memory. If the master node fails and reboots, all the configuration data will be lost. The MBCS is incompatible with distributed masters. However, it is very easy to administer, computationally lightweight, and good for testing and experimentation. - The ZooKeeper-Backed Config Store (ZBCS) stores configurations persistently and takes care of synchronizing them between multiple masters. .Flume and Apache ZooKeeper ************************************************ Flume relies on the Apache ZooKeeper coordination platform to provide reliable, consistent, and persistent storage for node configuration data. A ZooKeeper ensemble is made up of two or more nodes which communicate regularly with each other to make sure each is up to date. Flume embeds a ZooKeeper server inside the Master process, so starting and maintaining the service is taken care of. However, if you have an existing ZooKeeper service running, Flume supports using that external cluster as well. ************************************************ ==== Which Configuration Store Should I Use? In almost all cases, you should use the ZBCS. It is more reliable and fault-tolerant, and will recover configurations after a restart. It is compatible with both standalone and distributed deployments of the Flume Master. The MBCS is appropriate if you are experimenting with Flume and can stand to lose configuration if the machine fails. ZBCS is the default configuration store. The choice of which configuration store to use is controlled by the +flume.master.store+ system property. [source,xml] -------------------------------------- flume.master.store zookeeper -------------------------------------- If set to +memory+, the Flume Master will use MBCS instead. This is only supported in standalone mode. ==== Configuring the ZBCS Most deployments using the ZBCS can use Flume's default configuration. However, where more control over the precise configuration of the Flume Master is needed, there are several properties that you can set. Log Directory - +flume.master.zk.logdir+ :: To ensure reliability and the ability to restore its state in the event of a failure, ZBCS continually logs all updates it sees to the directory in +flume.master.zk.logdir+. This directory must be writable by the user as which Flume is running, and will be created if it doesn't exist at start-up time. WARNING: Do not delete this directory, or any files inside it. If deleted, all your configuration information will be lost. ZBCS Server Ports :: Each machine in the distributed Flume Master communicates with every other on the TCP ports set by +flume.master.zk.server.quorum.port+ and +flume.master.zk.server.election.port+. The defaults are 3182 and 3183 respectively. Note that these settings control both the port on which the ZBCS listens, and on which it looks for other machines in the ensemble. ZBCS Client Port - +flume.master.zk.client.port+ :: The Flume Master process communicates with ZooKeeper (either on the same machine, or remotely on another Master server) via a client TCP port, which is set by +flume.master.zk.client.port+. The default is 3181. ==== Gossip in Distributed Mode Flume Master servers also use a 'gossip' protocol to exchange information between themselves. Each server periodically wakes and picks another machine to send new data to. This protocol by default uses TCP port 57890, but this is controlled via the +flume.master.gossipport+ property: -------------------------------------- flume.master.gossip.port 57890 -------------------------------------- In standalone mode, there is no need to use gossip, so this port is unused. ==== Diagrams: How the Masters and Nodes talk to each other ["graphviz", "master-zk-standalone.png"] .Flume Master: Standalone Mode --------------------------------------------------------------------- digraph multi_collector { label="Flume Master: Standalone Mode"; rankdir=LR; edge [arrowhead="none"]; node [shape=none] nodes ->masters [label="flume.master.config.port", fontsize=10]; masters -> zookeeper [label="flume.master.store.port", fontsize=10]; node [shape=Mrecord] edge [style="bold", weight=10] FM1 -> ZK1; subgraph cluster_FM1 { label="Flume Master 1"; FM1; ZK1; } subgraph cluster_ZK { { rank=same; ZK1 } label="ZooKeeper Ensemble"; } node [shape=none height=.15 fontsize=10] edge [style="filled" arrowhead="none"]; node1 -> FM1:w; node2 -> FM1:w; node3 -> FM1:w; node4 -> FM1:w; node5 -> FM1:w; node6 -> FM1:w; node7 -> FM1:w; node8 -> FM1:w; node9 -> FM1:w; { rank=same; FM1; masters } } --------------------------------------------------------------------- ["graphviz", "master-zk-internal.png"] .Flume Master: Distributed Mode --------------------------------------------------------------------- digraph multi_collector { label="Flume Master: Distributed Mode"; rankdir=LR; edge [arrowhead="none"]; node [shape=none] nodes ->masters [label="flume.master.config.port", fontsize=10]; masters -> masters [label="flume.master.gossip.port", fontsize=10]; masters -> zookeeper [label="flume.master.zk.client.port", fontsize=10]; zookeeper -> zookeeper [label="flume.master.zk.server.{quorum,election}.port", fontsize=10]; node [shape=Mrecord] edge [style="bold", weight=10] FM1 -> ZK1; FM2 -> ZK2; FM3 -> ZK3; subgraph cluster_FM1 { label="Flume Master 1"; FM1; ZK1; } subgraph cluster_FM2 { label="Flume Master 2"; FM2; ZK2; } subgraph cluster_FM3 { label="Flume Master 3"; FM3; ZK3; } subgraph cluster_ZK { { rank=same; ZK1; ZK2; ZK3} label="ZooKeeper Ensemble"; edge [style="filled" arrowhead="none"]; ZK1:e->ZK3:e; ZK2:e->ZK3:e; ZK1:e->ZK2:e; } subgraph cluster_Gossip { { rank=same; FM1; FM2; FM3} label="Gossip"; edge [style="filled" arrowhead="none"]; FM1:e->FM3:e; FM2:e->FM3:e; FM1:e->FM2:e; } node [shape=none height=.15 fontsize=10] edge [style="filled" arrowhead="none"]; node1 -> FM1:w; node2 -> FM1:w; node3 -> FM1:w; node4 -> FM2:w; node5 -> FM2:w; node6 -> FM2:w; node7 -> FM3:w; node8 -> FM3:w; node9 -> FM3:w; { rank=same; FM1; FM2; FM3; masters } } --------------------------------------------------------------------- ==== Configuring Flume Nodes to Connect to Multiple Master Servers One property needs to be set to configure a Flume Node to connect to multiple Masters: +flume.master.servers+. [source,xml] -------------------------------------- flume.master.servers masterA,masterB,masterC -------------------------------------- The nodes connect over the port +flume.master.heartbeat.port+ on each machine in the Flume Master - this is the port that the Master servers listen on for node heartbeats. If a Master server fails, nodes will automatically fail over to the next randomly selected Master server that they can establish a connection to. === External ZooKeeper Cluster In some cases you may want a ZBCS that relies on an externally managed ZooKeeper service. The most common example of this is where multiple services which rely on ZooKeeper are being used (Flume and Hbase for example). In the following example zkServer{A,B,C}:2181 should be replaced with the hostname/port of the ZooKeeper servers which make up your ensemble. .`conf/flume-site.xml` [source,xml] -------------------------------------- flume.master.zk.use.external true flume.master.zk.servers zkServerA:2181,zkServerB:2181,zkServerC:2181 -------------------------------------- === Section Summary This section described installing, deploying, and configuring a set of Flume nodes in a fully distributed setting. You should now be able to collect streams of logs with Flume. You also used some roles for sources and sinks to connect nodes together. You now have an understanding of the basics of setting up a set of Flume nodes. Here's the new sources and sinks introduced in this subsection. // These tables are kinda gross, had to use asciidoc v8.2.7 style. .Flume's Tiered Event Sources +collectorSource[(_port_)]+ :: Collector source. Listens for data from agentSinks forwarding to port +_port_+. If port is not specified, the node default collector TCP port, 35853.