SEQ2com.cloudera.flume.handlers.hdfs.WriteableEventKey/com.cloudera.flume.handlers.hdfs.WriteableEventP~mp/&jd&jd;a blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypebeg AckChecksum&jdP~mp/&jdGetting Started with Flume&jd9 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumFP~mp/&jd==========================&jd;o blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum|TTP~mp/&jd!Jonathan Hsieh &jd;qK blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum QP~mp/&jdv0.2.0, 1/25/10&jd;rƄ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum_hP~mp/&jd9(c) Copyright (2009), Cloudera Inc. All rights reserved.&jd;t6 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumTUTP~mp/&jd&jd;u blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd*// Don't know how to get multiple authors.&jd;w blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumz/ P~mp/&jd// Don't know how to get a TOC&jd;y# blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum~@P~mp/&jd&jd;zlA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd[[Introduction]]&jd;{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumsvP~mp/&jd== Introduction&jd;}X blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd&jd;~ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdCFlume is a distributed, reliable, available service for efficiently&jd;y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum fP~mp/&jdDmoving large amounts of data as it is produced. We're concentrating&jd;Y3 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumVyC(P~mp/&jdBon reliable logging, but really it's a high-performance conduit to&jd;; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumo1P~mp/&jdship data around a cluster. &jd;O blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum?7P~mp/&jd&jd;% blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdEThe primary use case is as a logging system that tails a bunch of log&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumm_P~mp/&jdWe have a simple, but flexible, data model. Events - which are&jd;ؗ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumMP~mp/&jd@unstructured blobs - can be annotated with attributes, which are&jd;3 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumVP~mp/&jdEkey-value pairs. We can serialize this in pretty much any format you&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum6P~mp/&jd like when the data leaves Flume.&jd;1 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd&jd;| blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdEFlume's architecture is simple and robust. A typical deployment might&jd;[ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumIP~mp/&jdFhave three-tiers, the first being a Flume agent tier (leaf nodes) that&jd;B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd@forward to a second tier of Flume collector nodes, which in turn&jd;r blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdFaggregate the data and then forward to a storage tier such as Hadoop's&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum["P~mp/&jd@hdfs. Flume agents are typically installed on the machines that&jd;} blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumzD3P~mp/&jd>generate the data and are Flume's initial point of contact for&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumE+P~mp/&jdDdata. Once data gets acked by a Flume agent node, we guarantee that,&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumǐP~mp/&jdEas long as the node remains live, the data will eventually get to its&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum.P~mp/&jdEfinal destination. As Flume forwards the data to upstream nodes, some&jd;* blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum`rP~mp/&jdFintermediate processing can be forwarded before data is forward again.&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum!P P~mp/&jd&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd@One nice feature of Flume is that the configuration is centrally&jd;# blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksummP~mp/&jdAmanaged. Nodes can be reconfigured dynamically without having to&jd;K blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdFactually ssh'ing to a machine, tweaking some conf files and restarting&jd;t blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumtP~mp/&jdDa daemon. This gives the system flexibility to dynamically tolerate&jd;xn blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumAJoP~mp/&jdFnetwork failures, and flexibility to change deployment architecture to&jd; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum QP~mp/&jd best suit your network topology.&jd;Fc blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd&jd;9 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdCThis document describes using and setting up Flume in three stages.&jd;g blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumƯIP~mp/&jdAThe first configures a Flume node and gets it working on a single&jd;@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumipP~mp/&jdCmachine. The second explains how to configure a pseudo-distributed&jd;S blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumkQP~mp/&jd?multi-node topology on the localhost. Finally, the third stage&jd>; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum>*P~mp/&jd>addresses distributed deployment and introduces mechanisms for&jd>X blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumPP~mp/&jd0deploying Flume onto other nodes in the cluster.&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumRP~mp/&jd&jd>1 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd[[Quickstart]]&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum۝P~mp/&jd == Quickstart&jd>M blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumKP~mp/&jd&jd>5 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdDIn this section we get a single Flume node up and running and verify&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum"NP~mp/&jdEthat the basic prerequisites for Flume are installed and working. We&jd>C blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumvgP~mp/&jdGassume that you have installed Flume (by untarring it) to the `~/flume`&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\RP~mp/&jd directory.&jd>!'D blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumhP~mp/&jd&jd>"^ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd.Prerequisites&jd># blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumXP~mp/&jd****&jd>%X blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumoVP~mp/&jd* Flume&jd>& blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumsNP~mp/&jdP* A Linux system (tested on Centos 5.3 and Ubuntu 9.04, should work on Mac OS X)&jd>(r blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd)* Java 1.6 (only tested with Sun JRE 1.6)&jd>*FP blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum0];P~mp/&jd****&jd>+U blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumoVP~mp/&jd&jd>, blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdAWe will start start by seeing if we can get a Flume node running.&jd>. blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumUeP~mp/&jdBAfter this we will introduce a handful of data *sources*, and then&jd>0M blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumz)P~mp/&jd-describe how to arbitrarily configure a node.&jd>2Z blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumjt͢P~mp/&jd&jd>3 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd"=== Sources and the `dump` command&jd>5Q blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd&jd>6 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdEFirst lets get a Flume node up that dumps data written to the console&jd>8 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum|P~mp/&jdCstdin back out to the console stdout. We can do this by running the&jd>:B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumI=P~mp/&jdfollowing command.&jd>=l blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd>> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd$ bin/flume dump console&jd>@@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum]THP~mp/&jd----&jd>Ad blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd>Bu blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdFTIP: The Flume program has the general form `bin/flume [args&jd>DK blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksume]P~mp/&jd...]`. &jd>E\ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumLfsP~mp/&jd&jd>F blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd@TIP: The command above is using the `dump` command and `console`&jd>H& blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum) P~mp/&jd@is its argument. Its general form is `bin/flume dump `.&jd>JS blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumYP~mp/&jd&jd>K blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdFWe have started a Flume node where `console` is the source of incoming&jd>MO0 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum; -P~mp/&jdEdata. When you run it you should see some logging messages displayed&jd>O' blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum-GP~mp/&jd@to the console. For now, you can ignore messages about masters,&jd>PN blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumꙡ8P~mp/&jdFback-off and failed connections (we'll touch on this later). When you&jd>Rv blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\P~mp/&jdFtype at the console and hit a new line, you should see a new log entry&jd>Tt blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumԺP~mp/&jdAline with the data that you typed appear. If I entered `This is a&jd>V: blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumcP~mp/&jd%test` it should look similar to this:&jd>W blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumA8P~mp/&jd&jd>X blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd>Z9 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd;hostname [INFO Thu Nov 19 08:37:13 PST 2009] This is a test&jd>[ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum1#zP~mp/&jd----&jd>]: blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd>^c\ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd#To exit the program, simply hit ^C.&jd>_ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum_P~mp/&jd&jd>a blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd@// TODO there are actually too many irrelevant events right now.&jd>bύ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumQ|P~mp/&jd8// Need to turn of heartbeat when using one shot option.&jd>dy blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum ]P~mp/&jd&jd>e blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd ==== `text`&jd>f blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumO"LP~mp/&jd&jd>h" blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd@We can also specify other sources of events. For example, if we&jd>i' blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum։P~mp/&jdCwanted a text file where each line represents a new event, we could&jd>k blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum6P~mp/&jdrun the following command. &jd>m" blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumZwP~mp/&jd&jd>n` blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd>o/ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&$ bin/flume dump 'text("README.html")'&jd>q) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumN%hP~mp/&jd----&jd>r~ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd>s blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdGThis command reads the file, and then outputs each line as a new event.&jd>u{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum"EP~mp/&jd&jd>v blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdDNOTE: You can try this with other files such as `/var/log/messages`,&jd>xc blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum2P~mp/&jdB`/var/log/syslog`, or `/var/log/hadoop/hadoop.log` also. However,&jd>z* blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumȟP~mp/&jd>Flume must run with appropriate permissions to read the files!&jd>{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\P~mp/&jd&jd>} blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd ==== `tail`&jd>~k blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum_P~mp/&jd&jd>q blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd?If we wanted to tail a file instead of just reading it, we just&jd>H blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumO8P~mp/&jdDspecify another source -- this time we use `tail` instead of `text`.&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum]P~mp/&jd&jd>; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd5// TODO (jon) replace with echo >> temp file example.&jd>) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumQ8P~mp/&jd&jd>q blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd>V blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd#$ bin/flume dump 'tail("testfile")'&jd>R blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\+P~mp/&jd----&jd>9 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd>_) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdEThis pipes data from the file into Flume and then out to the console.&jd>+g blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumPP~mp/&jd&jd>V blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdiThere will be a message that says "File 'testfile' does not currently exist, waiting for file to appear".&jd>] blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum|P~mp/&jd&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdCIn another terminal, we can create and write data to the file. New&jd>U blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum.P~mp/&jddata will appear.&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum)P~mp/&jd&jd>S blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd>)W blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd$ echo Hello world! >> testfile&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumOOP~mp/&jd----&jd>6 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd &jd>+h blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumlEP~mp/&jdIf we delete the file,&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumw&P~mp/&jd&jd>, blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd $ rm testfile&jd>Y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumv P~mp/&jd----&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdFthe `tail` sink will detect this. If we then recreate the file again,&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd9the `tail` source will detect the new file and follow it.&jd>j- blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*P~mp/&jd&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd$ echo Hello again! >> testfile&jd>\& blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumPP~mp/&jd----&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd&jd>, blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdAYou should see your new message appear in the Flume node console.&jd>; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\`P~mp/&jd&jd>T blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd==== Synthetic sources, `synth`&jd>2 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum? P~mp/&jd&jd>Y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jdDHere's one more example where we use the `synth` sources to generate&jd>) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum~P~mp/&jdevents.&jd>u blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum͛P~mp/&jd&jd>u blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jd----&jd> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&jd $ bin/flume dump 'synth(100,10)'&jd>k blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumC P~mp/&jd----&jd>Y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je5You should get 100 events, each with 10 random bytes.&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum"2P~mp/&je&je?U blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je{P~mp/&je&je?+ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je=== Section summary&je?J blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumRYLP~mp/&je&je?{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeEWe talked through some basic sources, but Flume include several other&je?u blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum^ 54P~mp/&je7event sources. The table below summarizes some of them.&je?) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&je?P blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeB// These tables are kinda gross, had to use asciidoc v8.2.7 style.&je?b blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksuma%P~mp/&je.Flume Event Sources&je?{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumj6ÛP~mp/&je [grid="all"]&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumX P~mp/&jet`--------------------------------`----------------------------------------------------------------------------------&je? B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumKNI P~mp/&je,name description&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je`------------------------------------------------------------------------------------------------&je?6 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%P~mp/&je/`console` Stdin console &je?dX blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum$DP~mp/&jeS`text("filename")` One shot text file source. One line is one event &je?O blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum,paiP~mp/2&je`tail("filename")` Similar to Unix's `tail -F`. One line is one event. Stays open for more data and follows filename if file rotated. &je?л blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWQP~mp/&jeY`seqfile("filename")` Serialized Flume events in Hadoop sequence file format. &je?׶ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumL pP~mp/&jeY`syslogUdp(port)` Syslog over udp port, port. This is syslog compatible. &je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum֠P~mp/&je[`syslogTcp(port)` Syslog over tcp port, port. This is syslog-ng compatible. &je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumlozP~mp/ &jer`synth(msg_count,msg_size)` A source that synthetically generates msg_count random messages of size msg_size &je?O blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumZP~mp/C&je`nonlsynth(msg_count,msg_size)` A source that synthetically generates msg_count random messages of size msg_size. This converts all `\'\n\'` chars into `\' \'` chars.&je?  blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je`------------------------------------------------------------------------------------------------&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%P~mp/&je&je?A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je== Pseudo-Distributed Mode&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumuMP~mp/&je&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeBFlume is intended to be run as a distributed system with processes&je?f blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum3P~mp/&jeAspread out across _many_ machines. It can also be run as several&je?e blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum}P~mp/&jeEprocesses on a _single_ machine. We call this ``pseudo-distributed''&je?5 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumZP~mp/&jeFmode. This is useful for debugging Flume configurations and getting a&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*P~mp/&je-better idea of how Flume components interact.&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum^{P~mp/&je&je?\ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeFIn the previous section, we had a single Flume node and introduced the&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum4XP~mp/&je>concept of Flume sources. Now, we introduce some new concepts&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumk-P~mp/&je9required for a distributed setup. This include the Flume&je?M blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumDDP~mp/&je?*configuration master* server, the configuration of sources and&je?A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum.=P~mp/&je/*sinks*, and connecting multiple Flume *nodes*.&je?d blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum>zP~mp/&je&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je-=== Starting pseudo-distributed Flume daemons&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum$(P~mp/&je&je? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeEThere are two kinds of process in the system: the Flume configuration&je?z> blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumlEP~mp/&jeF*master* and the Flume *node*. The Flume configuration master (master&je?A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum)uP~mp/&jeP~mp/&je &je @1 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je ----&je @3H blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je $ bin/flume master&je @4] blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum#\P~mp/&je ----&je @6b blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je &je @7F blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je ?Once it is started, one can access the master by pointing a web&je @9#v blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum]P~mp/&je #browser to http://localhost:35871/.&je @;k blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum+0P~mp/&je &je @E blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumTP~mp/&je &je @?2 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je =This webpage displays the status of the Flume nodes that have&je @AM blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&^( P~mp/&je Bcontacted the master, and shows each node's current configuration.&je @DI blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumzP~mp/&je EWhen we start this up without and Flume nodes running, the status and&je @Ft blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&P~mp/&je configs tables will be empty.&je @V blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je &je @XI blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je DTo start a Flume node, we can just invoking the following command in&je @Z+ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum搬P~mp/&je another terminal.&je @r) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%P~mp/&je &je @uS blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je ----&je @v7 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je $ bin/flume node &je @x/ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumǖ'P~mp/&je ----&je @y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je&je@z¾ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeCWe can see if the Flume node us up by is by pointing the browser to&je@| blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum P~mp/&jeFthe node's status server, http://localhost:35862/. It should by bring&je@~ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeFup a simple web page with a configuration version (sometime in 1969 or&je@n blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumyP~mp/&jeD1970 depending on you time zone), a source config, and a sink config&je@\ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum3MP~mp/&je+field. The status field should be "HELLO".&je@  blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumu_UP~mp/&je&je@M blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je(// TODO It should go from HELLO to IDLE.&je@, blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum~P~mp/&je&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeAWe should also check to make sure that the node has contacted the&je@L blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumuNx P~mp/&jeDmaster by checking the config master page and seeing if the node has&je@/ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum-LP~mp/&jeDshown up in the tables. The node column should have the name of the&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*)QP~mp/&jeFnode. This should be the same as running `hostname` from Unix prompt.&je@׻ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum7P~mp/&jeAIn the rest of this documentation I will assume the node is named&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumrP~mp/&je `_example_`.&je@\ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum?%P~mp/&je&je@7 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je%=== Configuring a node via the master&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&je@Ds blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeDHaving nodes content the master to get their configuration allows us&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWP~mp/&jeBto dynamically change the configuration of nodes without having to&je@z blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeBlogin to the remote machine to restart the daemon. We can quickly&je@ŏ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum"8P~mp/&je?change the node's previous configuration to a new one. In this&je@P blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum4VP~mp/&je@subsection we show how to configure nodes using the master's web&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumt0P~mp/&je interface.&je@T blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&je@̞ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeFIf we go to the master's web page and click on the config link, we are&je@| blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeApresented with two forms. These are web interfaces for setting a&je@h blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum0:P~mp/&jeBconfiguration on the master. When Flume nodes contact the master,&je@I blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumaP~mp/&jeP~mp/&jeSink: `console` &je@A9 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumR9eP~mp/&je----------------------------&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumL9P~mp/&je&je@[ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeBIf you go back to the master page you will notice that the version&je@# blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWD1P~mp/&jeDstamp has change to a current time, and that the src and sink fields&je@uK blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeBof the configs has been updated. Status will eventually change to&je@4q blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksuma]P~mp/&jeEbecome "ACTIVE". When it is, it is ready to receive console traffic.&je@; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum0{P~mp/&je&je@1& blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeFIf you go to the terminal where your Flume node is running, you should&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumCLP~mp/&jeEbe able to type a few lines and then get output back showing your new&je@Ƣ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum"A6P~mp/&je log message.&je@+5 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&je@_ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je [grid="all"]&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumX P~mp/&je`----------`----------------&je@' blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumYP~mp/&jeNode name: `_example_`&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWP~mp/&je Source: `text("README.html")`&je@4Y blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum* P~mp/&jeSink: `console` &je@1 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumR9eP~mp/&je----------------------------&je@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumL9P~mp/&je&je@Ek blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeENOTE: You may need to hit enter in the Flume node console. (This is a&jeA L blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumb yP~mp/&jebug that will get fixed.)&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum>0+P~mp/&je&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je"Or these new values to tail a file&jeA0_ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum^0P~mp/&je&jeAd blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je [grid="all"]&jeAh blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumX P~mp/&je`----------`----------------&jeA , blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumYP~mp/&jeNode name: `_example_`&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWP~mp/&je Source: `tail("README.html")`&jeA  blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeSink: `console` &jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumR9eP~mp/&je----------------------------&jeAK blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumL9P~mp/&je&jeA@ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeDGreat! We can now change the configuration of different nodes in the&jeA" blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumNnDP~mp/&jeDsystem to gather data from a variety of sources by going through the&jeA[ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum44=P~mp/&jemaster!&jeAV blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je=== Introducing sinks&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumU P~mp/&je&jeA= blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je?Thus far, we have seen that Flume has a variety of sources that&jeAے blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumMP~mp/&jeDgenerate or accept new events that are fed into the system. We have&jeAr blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum:"сP~mp/&jeElimited out where these messages go so far to the `console` sink. As&jeAk blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumhP~mp/&jeEyou would expect, Flume also provides a wide variety of event *sinks*&jeA % blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksummˠP~mp/&je#-- destinations for all the events.&jeA"Dx blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum0RkP~mp/&je&jeA#o5 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeFThere are many possible destination for events -- to disk, to hdfs, to&jeA%7[ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumncZP~mp/&je?the console, or forwarding across the network. We use the sink&jeA&< blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum!P~mp/&je?abstractions an interface for forwarding events to any of these&jeA( blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumŅP~mp/&je destinations.&jeA*h blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum B P~mp/&je&jeA+>$ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je?We now can just connect different sources to different sinks by&jeA- blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumxHP~mp/&jeFspecifying the new configuration and submitting it to the master. For&jeA. blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum4k>2P~mp/&jeCexample would essentially make a copy of `README.html` (modulo some&jeA0 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum;P~mp/&jeformatting changes).&jeA2 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum'iFyP~mp/&je&jeA3d blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je [grid="all"]&jeA4 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumX P~mp/&je`----------`----------------&jeA6Gp blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumYP~mp/&jeNode name: `_example_`&jeA7z blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumWP~mp/&je Source: `text("README.html")`&jeA9\' blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum* P~mp/&je!Sink: `text("README.copy")` &jeA:a blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumnؒP~mp/&je----------------------------&jeA<~= blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumL9P~mp/&je&jeA= blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeBWARNING: The `text`, `seqfile` and `dfs` sinks overwrite if a file&jeA?Q blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumXeP~mp/&jepreviously exists.&jeAA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je&jeAB6 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeDWe actually support several sinks that you can choose from. Here is&jeAD_ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumzo;P~mp/&jeGan abridged listing of sinks to try. (There are more in the appendix!)&jeAFf blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumgΝ P~mp/&je&jeAG. blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeCWARNING: All `_host_` arguments currently use the ip or dns name of&jeAI blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum.^P~mp/&jeEthe node as opposed to the Flume node name (set by `-n name` option).&jeAKn blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum[)nP~mp/&jeEUnless overridden by command line, the default the Flume node name is&jeAMR blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumPP~mp/&jethe host name.&jeANǃ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&P~mp/&je&jeAO blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je.Flume Event Sinks&jeAQy2 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum~P~mp/&je [grid="all"]&jeARg blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumX P~mp/&jex`------------------------------------`----------------------------------------------------------------------------------&jeAU% blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum P~mp/&je0name description&jeAVI blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumvHP~mp/&je`------------------------------------------------------------------------------------------------&jeAX blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%P~mp/&jeC`null` Null sink. Events are dropped.&jeAZ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum-e\5P~mp/&jeP`console` Console sink. Display to console's stdout.&jeA\› blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum P~mp/&jeS`text("_txtfile_")` Textfile sink. Write to text file `_txtfile_`&jeA^ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum>~P~mp/.&je`seqfile("_seqfile_")` Seqfile sink. Write serialized Flume events to local file system file `_seqfile_` in Hadoop's seqfile format.&jeAa9p blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumPP~mp/L&je`dfs("_dfsfile_")` DFS seqfile sink. Write serialized Flume events to a dfs path such as `hdfs://namenode/file` or `file:///file` in Hadoop's seqfile format. &jeAc7 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksuml,P~mp/-&je`syslogTcp("_host_",_port_)` Syslog tcp sink. Forward to events to `host` on tcp port `port` in syslog wire format (syslog-ng compatible)&jeAf blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumE40P~mp/&je`------------------------------------------------------------------------------------------------&jeAh blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%P~mp/&je&jeAj blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeCWARNING: Using `dfs` has some restrictions. See the more details in&jeAk blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum#P~mp/&jethe Trouble shooting section.&jeAm} blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumoZuP~mp/&je&jeAnh blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je=== Aggregated Configurations&jeApk blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumnP~mp/&je&jeAq` blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeCUsing the the simple for configuring a handful of machines is quite&jeAs blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&jeEmanageable, but when we get to tens or hundreds of machines, we would&jeAu blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum\6P~mp/&jeElike maintain or auto generate the configuration for all the machines&jeAwqi blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumuP~mp/&je@in a single file. To do this we provide a mechanism for setting&jeAyQ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum7zNP~mp/&jeDconfigurations of many machines in a single aggregated configuration&jeA{@0 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum$ P~mp/&je submission.&jeA|} blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*DP~mp/&je&jeA}S blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je>Let's start using configurations from the previous subsection.&jeA֪ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum?bP~mp/&jeAInstead of the method we used from the "Configuring nodes via the&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumh&P~mp/&je?master", we could put the following configuration line into the&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumAh}P~mp/&je'"Configure nodes" form and then submit:&jeAK~ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumͺFP~mp/&je&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je----&jeA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je)example : text("README.html") | console ;&jeA; blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum|ņ!P~mp/&je----&jeA  blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je &je A] blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je Or &je A{ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum(SP~mp/&je &je A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je ----&je A_ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je 3example: text("README.html") | text("README.copy");&je A0 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksume#P~mp/&je ----&je A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je &je Aҷ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je AThe general format only adds a little syntax and looks like this:&je AŲ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum<P~mp/&je &je A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je ----&je Afw blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je  : | ;&je A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum%4P~mp/&je! : | ;&je!A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je! : | ;&je!A)$ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum-;P~mp/&je!...&je!Aq blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumCP~mp/&je!----&je!A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je!&je!A, blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je!=From here on out, we will use this format to configure nodes.&je!A / blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je!&je!AP blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je!.=== Tiering Flume nodes: Agents and Collectors&je!Aq blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum3 P~mp/&je!&je!Ae blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je!AA simple network connection is abstractly just another sink. One&je!A? blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum5j}P~mp/&je">would hope that sending events over the network would be easy,&je"A blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je" blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*>P~mp/&je(C(`-r`). These two extra ports would cause contention problems with&je(B # blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumTHP~mp/&je(Ethe `_example_` node already running on the machine. (We'll describe&je(B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum3P~mp/&je(,these options more in the advanced section.)&je(B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum,P~mp/&je(&je(B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je(----&je(BA blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je(#$ bin/flume node -n collector -r -s&je(B blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum*UP~mp/&je(----&je(BL blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je)&je)B* blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je)@When we look at the master's again, we should eventually see two&je)BS blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum죣P~mp/&je)#nodes: `_example_` and `collector`.&je)B1 blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&P~mp/&je)&je)Bi blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je)CLet's configure `collector` to take on the role of a collector, and&je)BKs blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je)Flets setup `_example_` to send data from the console to the collector.&je)B* blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumpXP~mp/&je)EWe'll use the aggregated multiple configuration form. The agent uses&je)B! blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je)@the `agentSink`, a high reliability network sink. The collector&je)B" blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumұcP~mp/&je)Fnode's source is configured to be a `collectorSource`, and its sink is&je)B$ޣ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum꜄P~mp/&je*'configured to be the `console` for now.&je*B&~ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum^$mP~mp/&je*&je*B' blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je*----&je*B) blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je*example : console | agentSink ;&je*B* blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum:P~mp/&je*.collector : collectorSource(35863) | console ;&je*B,X blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumoGuP~mp/&je*----&je*B-d blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksum&|{P~mp/&je*&je*B/ blitzwingAckTag/writeahead.00000000.20100204-015814430-0800.seqAckTypemsg AckChecksumP~mp/&je*FNow when you type lines in `_example_`'s console, events are