//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Pseudo-distributed Mode Flume is intended to be run as a distributed system with processes spread out across _many_ machines. It can also be run as several processes on a _single_ machine, which is called ``pseudo-distributed'' mode. This mode is useful for debugging Flume data flows and getting a better idea of how Flume components interact. The previous section described a Flume node and introduced the concept of Flume sources. This section introduces some new concepts required for a distributed setup: the Flume *master* server, the specification of sources and *sinks*, and connecting multiple Flume *nodes*. === Starting Pseudo-distributed Flume Daemons There are two kinds of processes in the system: the Flume *master* and the Flume *node*. The Flume Master is the central management point and controls the data flows of the nodes. It is the single logical entity that holds global state data and controls the Flume node data flows and monitors Flume nodes. Flume nodes serve as the data path for streams of events. They can be the sources, conduits, and consumers of event data. The nodes periodically contact the Master to transmit a heartbeat and to get their data flow configuration. In order to get a distributed Flume system working, you must start a single Flume Master and some Flume nodes that interact with the Master. You'll start with a Master and one Flume node and then expand. ==== The Master The Master can be manually started by executing the following command: ---- $ flume master ---- After the Master is started, you can access it by pointing a web browser to http://localhost:35871/. This web page displays the status of all Flume nodes that have contacted the Master, and shows each node's currently assigned configuration. When you start this up without Flume nodes running, the status and configuration tables will be empty. image::master-empty.png[] The web page contains four tables -- the 'Node status' table, the 'Node configuration' table, the 'Physical/Logical Node mapping' table, and a 'Command history' table. The information in these tables represent the current global state of the Flume system. The Master's 'Node status' table contains the names of all of the Flume Nodes talking to the Master, their current configuration version (initially "none"), their status (such as IDLE), and when it last reported to the Master. The name of each Flume node should be the same as running +hostname+ from Unix prompt. The Master's 'Node configuration' table contains the logical name of a node, the configuration version assigned to it, and a specification of its sources and its sinks. Initially, this table is empty, but after you change values you can view this web page to see the updates. There are two sets of columns - - the user entered version/source/sink, and translated version/source/sink. A later section of this guide describes translated configs. The Master's 'Physical/Logical Node mapping' table contains the mapping of logical nodes to their physical nodes. The Master's 'Command history' table contains the state of commands. In general, *commands* modify the Master's global state. Commands are processed serially on a Master and are assigned a unique command ID number. Each command has a state (for example, SUCCEEDED, FAILED, or PENDING), a command line, and a message which often contains information about its execution attempt. ==== The Flume Node To start a Flume node, invoke the following command in another terminal. ---- $ flume node_nowatch ---- NOTE: Normally, you start a node using +flume node+ or run the node as a daemon. For these examples, you disable the watchdog feature by starting with the +node_nowatch+ option. This option enables you to interact with a node via the console. The other options disable +stdin+. To check whether a Flume node is up, point your browser to the 'Flume Node status page' at http://localhost:35862/. Each node displays its own data on a single table that includes diagnostics and metrics data about the node, its data flows, and the system metrics about the machine it is running on. If you have multiple instances of the flume node program running on a machine, it will automatically increment the port number and attempt to bind to the next port (35863, 35864, etc) and log the eventually selected port. If the node is up, you should also refresh the Master's status page (http:// localhost:35871) to make sure that the node has contacted the Master. You brought up one node (assume the node is named +_host_+), so you should have one node listed in the Master's node status table, and an entry in the logical node mapping table that links the +_host_+ logical node to the +_host_+ physical nodes. === Configuring a Node via the Master Requiring nodes to contact the Master to get their configuration enables you to dynamically change the configuration of nodes without having to log in to the remote machine to restart the daemon. You can quickly change the node's previous data flow configuration to a new one. The following describes how to "wire" nodes using the Master's web interface. On the Master's web page, click on the config link. You are presented with two forms. These are web interfaces for setting the node's data flows. When Flume nodes contact the Master, they will notice that the data flow version has changed, instantiate, and activate the configuration. For this example, you will do the steps you did in the Quick Start section. Enter the following values into the "Configure a node" form, and then click Submit. [grid="all"] `----------`---------------- Node name: +_host_+ Source: +console+ Sink: +console+ ---------------------------- Refresh the Master page and notice that the version stamp changed to a current time, and that the src and sink fields of the configs updated. After the status changes to "ACTIVE", it is ready to receive console traffic. On the master, a node can be in one of several states: * HELLO : A new node instance initially contacted the master. * IDLE : A node has completed its configuration or has no configuration. * CONFIGURING: A node has received a configuration and is activating the configuration. * ACTIVE: A node is actively pulling data from the source and pushing data into the sink. * LOST: A node has not contacted the master for an extended period of time (default is after 10x the expected heartbeat period -- 50s by default) * DECOMMISSIONED: A node has been purposely decommissioned from a master. * ERROR: A node has stopped in an error state. On the terminal where your Flume node is running, you should be able to type a few lines and then get output back showing your new log message. [grid="all"] `----------`---------------- Node name: +_host_+ Source: +text("/etc/services")+ Sink: +console+ ---------------------------- NOTE: You may need to press Enter in the Flume node console. Or use these new values to tail a file: [grid="all"] `----------`---------------- Node name: +_host_+ Source: +tail("/etc/services")+ Sink: +console+ ---------------------------- You can now change the configuration of different nodes in the system to gather data from a variety of sources by going through the Master. === Introducing Sinks Thus far, you have seen that Flume has a variety of sources that generate or accept new events that are fed into the system. You have limited the output of these messages to the +console+ sink. As you would expect, Flume also provides a wide variety of event *sinks* -- destinations for all of the events. There are many possible destinations for events -- to local disk, to HDFS, to the console, or forwarding across the network. You use the sink abstractions as an interface for forwarding events to any of these destinations. You can connect different sources to different sinks by specifying the new configuration and submitting it to the Master. For example, with the data flow below, you can make a copy of +/etc/services+. [grid="all"] `----------`---------------- Node name: +_host_+ Source: +text("/etc/services")+ Sink: +text("services.copy")+ ---------------------------- WARNING: The +text+ sinks overwrite if a file previously exists. Notice that the file is copied as is. Sinks have optional arguments for output format, which you can use to write data in other serialization formats. For example, instead of copying a file using the default "raw" formatter, you can format the output using other formatters such as "avrojson" (the Avro serialization json format), "avrodata" (the Avro serialization binary data format), or a "debug" mode (this is the formatter used by the console sink). If you enter: [grid="all"] `----------`---------------- Node name: +_host_+ Source: +text("/etc/services")+ Sink: +console("avrojson")+ ---------------------------- You get the file with each record in JSON format displayed to the console. If you enter: [grid="all"] `----------`---------------- Node name: +_host_+ Source: +text("/etc/services")+ Sink: +text("services.json", "avrojson")+ ---------------------------- The newly written local +services.json+ file is output in avro's json format. .Default Arguments ********** Many sinks, sources, and decorators have arguments that modify their behavior. Some of these arguments are required; others are optional and use a default. The following notation describes these parameters: +_name_ (_req1_, _req2_[, _opt1_[, _opt2_=_foo_]] )+ where a component with name _name_, has two required arguments, _req1_ and _req2_. There are two optional arguments, _opt1_ and _opt2_. However, _opt1_ *must* be specified if _opt2_ is specified. _foo_ in this case is the default argument for _opt2_+. For the +console+ examples above, console could be specified as: +console[(_format_="debug")]+ and text could be specified as: +text("_file_"[, _format_="raw"])+ ********** There are several sinks you can use. The following list is a subset; see the Appendix for more sinks. .Flume Event Sinks [horizontal] +null+ :: Null sink. Events are dropped. +console[("format")]+ :: Console sink. Display to console's stdout. The "format" argument is optional and defaults to the "debug" output format. +text("_txtfile_"[,"format"])+ :: Textfile sink. Write the events to text file +_txtfile_+ using output format "format". The default format is "raw" event bodies with no metadata. +dfs("_dfsfile_")+ :: DFS seqfile sink. Write serialized Flume events to a dfs path such as +hdfs://namenode/file+ or +file:///file+ in Hadoop's seqfile format. Note that because of the HDFS write semantics, no data for this sink write until the sink is closed. +syslogTcp("_host_",_port_)+ :: Syslog TCP sink. Forward to events to +host+ on TCP port +port+ in syslog wire format (syslog-ng compatible), or to other Flume nodes setup to listen for syslogTcp. WARNING: Using +dfs+ has some restrictions and requires some extra setup. Files contents will not become available until after the sink has been closed. See the Troubleshooting section for details. === Aggregated Configurations Using the form for single node configuration of a small number of machines is manageable, but for larger numbers it is more efficient to maintain or auto-generate the configuration for all of the machines in a single file. Flume allows you to set the configurations of many machines in a single aggregated configuration submission. Instead of the method you used in the "Configuring a Node via the Master" section, put the following configuration line into the "Configure Nodes" form and then submit: ---- host : text("/etc/services") | console ; ---- Or: ---- host: text("/etc/services") | text("services.copy"); ---- The general format is: ---- : | ; : | ; : | ; ... ---- The remainder of this guide uses this format to configure nodes. === Tiering Flume Nodes: Agents and Collectors A simple network connection is abstractly just another sink. It would be great if sending events over the network was easy, efficient, and reliable. In reality, collecting data from a distributed set of machines and relying on networking connectivity greatly increases the likelihood and kinds of failures that can occur. The bottom line is that providing reliability guarantees introduces complexity and many tradeoffs. Flume simplifies these problems by providing a predefined topology and tunable reliability that only requires you to give each Flume node a role. One simple Flume node topology classifies Flume nodes into two roles -- a Flume *agent* tier and the Flume *collector* tier. The agent tier Flume nodes are co-located on machines with the service that is producing logs. For example, you could specify a Flume agent configured to have +syslogTcp+ as a source, and configure the syslog generating server to send its logs to the specified local port. This Flume agent would have an +agentSink+ as its sink which is configured to forward data to a node in the collector tier. Nodes in the collector tier listen for data from multiple agents, aggregates logs, and then eventually write the data to HDFS. WARNING: In the next few sections, all +_host_+ arguments currently use the physical IP or DNS name of the node as opposed to the Flume node name (set by +-n name+ option). The default Flume node name is the host name unless you override it on the command line. To demonstrate the new sinks in pseudo-distributed mode, you will instantiate another Flume node (a physical node) on the local box. To do this, you need to start a Flume node with some extra options. The command line below starts a physical node named +collector+ (+-n collector+): ---- $ flume node_nowatch -n collector ---- On the Master's web page, you should eventually see two nodes: +_host_+ and +collector+. The Flume Node status web pages should be available at http:// localhost:35862 and http://localhost:35863. Port bindings are dependent on instantiation order -- the first physical node instantiated binds on 35862 and the second binds to 35863. Next, configure +collector+ to take on the role of a collector, set up +_host_+ to send data from the console to the collector by using the aggregated multiple configuration form. The agent uses the +agentSink+, a high reliability network sink. The collector node's source is configured to be a +collectorSource+, and its sink is configured to be the +console+. ---- host : console | agentSink("localhost",35853) ; collector : collectorSource(35853) | console ; ---- ["graphviz", "tiers-console.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; node [shape=none] "agent tier" -> "collector tier" [label="35872", fontsize=10]; "collector tier"-> "storage tier" [ label="xxx", fontsize=10]; node [shape=record]; { rank=same; "agent tier"; host; } { rank=same; "collector tier"; "collector" } host -> collector; } --------------------------------------------------------------------- When you type lines in +_host_+'s console, events are forwarded to the collector. Currently, there is a bit of latency (15s or so) before the forwarded message shows up on +collector+. This is actually a configurable setting whose default is set to a value that is amenable for high event throughputs. Later sections describe how to tune Flume. You have successfully made a event flow from an agent to the collector. TIP: You can check to see if +agentSink+ is working by looking in the write- ahead logging directory for the sink. The default location is +/tmp/flume/ +. Because the OS periodically cleans this directory, this configuration property (+flume.agent.logdir+) should be changed a production environment. A more interesting setup is to have the agent tailing a local file (using the +tail+ source) or listening for local syslog data (using the +syslogTcp+ or +syslogUdp+ sources and modifying the syslog daemon's configuration). Instead of writing to a console, the collector would write to a +collectorSink+, a smarter sink that writes to disk or HDFS, periodically rotates files, and manages acknowledgements. ["graphviz", "tiers-hdfs.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; node [shape=none] "agent tier" -> "collector tier" [ label="35872", fontsize=10]; "collector tier"-> "storage tier" [ label="xxx", fontsize=10]; node [shape=record]; { rank=same; "agent tier"; host; } { rank=same; "storage tier"; "HDFS" } { rank=same; "collector tier"; "collector" } host -> collector -> HDFS; } --------------------------------------------------------------------- The following configuration is for an agent that listens for syslog messages and forwards to a collector which writes files to local directory +/tmp/flume/ collected/+. ---- host : syslogTcp(5140) | agentSink("localhost",35853) ; collector : collectorSource(35853) | collectorSink("file:///tmp/flume/collected", "syslog"); ---- In the following slightly modified configuration, the collector writes to an HDFS cluster (assuming the HDFS nameNode is called +namenode+): ---- host : syslogTcp(5140) | agentSink("localhost",35853) ; collector : collectorSource(35853) | collectorSink("hdfs://namenode/user/flume/ ","syslog"); ---- NOTE: There are no guarantees that data written to an HDFS file is durable until the HDFS file is properly closed. Because of this, the collector sink periodically closes a file and creates a new one in HDFS. The default time between file rolls (close then open new) is 30s. If you are writing data at low throughput (<2MB/s) you may want to increase the default time by modifying the +flume.collector.roll.millis+ and +flume.agent.logdir.retransmit+ time properties in your flume-site.xml file. === Section Summary This section describes how to start a Master and a node, and configure a node via the Master. The next section describes a more concise way of specifying many configurations, explains agents and collectors, and how to build an agent-collector pipeline on a single machine in a three-tier topology.