//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Appendix === Flume Source Catalog .Flume's Tiered Event Sources These sources and sinks are actually translated from their original specification into compositions of more specific lower level configurations. They generally have reasonable default arguments assigned by the properties xml file or by the Master. The defaults can be overridden by the users. [horizontal] +collectorSource[(_port_)]+:: Collector source. Listens for data from agentSinks forwarding to port +_port_+. If port is not specified, the node default collector TCP port, 35863. This source registers itself at the Master so that its failover chains can automatically be determined. +autoCollectorSource+ :: Auto collector source. Creates a logical collector that, when assigned to a physical node, will be included in the list of collectors in a failover chain. This is the collector counterpart to auto*Chain() sinks. See the section Automatic Failover Chains for additional information. +logicalSource+ :: Logical Source. This source has a port assigned to it by the Master and listens for rpcSink formatted data. .Flume's Basic Sources These sources are untranslated and generally need all of their arguments. [horizontal] +null+ :: Null source. Opens, closes, and returns null (last record) on next(). +console+ :: Stdin console source. This is for inputting events as an interactive user and provides features such as edit history and keyboard edit shortcuts. A flume node must be started with the +flume node_nowatch+ -- the watchdog does not allow console input. +stdin+ :: Stdin source. This is for piping data into a flume node's standard input data source. A flume node must be started with the +flume node_nowatch+ -- the watchdog does not allow console input. WARNING: although this can be used as an interactive console, it will hang a flume node until a newline is entered. +rpcSource(_port_)+ :: A remote procedure call (RPC) server that is configured to listen on TCP port +_port_+. Supports both Apache-Thrift and Apache-Avro RPC framework. The type of RPC framework is specified by +event.rpc.type+ property (THRIFT or AVRO), the default is THRIFT. Note that same RPC framework is used for rpcSink. +text("_filename_"[, _format_])+ :: One-time text file source. One event per +\n+ delimited line. +tail("_filename_"[, _startFromEnd_=false]{,delim="_regex_", delimMode="exclude|prev|next"})+ :: Similar to Unix's tail utility. One line is one event. Generates events for the entire file then stays open for more data, and follows filename. (e.g. if tailing file "foo" and then "foo" is moved to "bar" and a new file appears named "foo", it will finish reading the new "bar" file and then start from the beginning of "foo"). If the +startFromEnd+ parameter is false, tail will re-read from the beginning of the file. If it is true, it will only start reading from the current end of file. If the last line of a file does not end with a newline character ('\n'), the +tail+ source will only send an event with this last line when the +tail+ is closed. See the section on tailing a file for details on +delim+ and +delimMode+. +multitail("_filename_"[, _file2_ [,_file3_ ... ] ])+ :: Like +tail+ but can follow multiple files concurrently. +tailDir("_dirname_"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]{,delim="_regex_", delimMode="exclude|prev|next"})+ :: Tails all files in a directory +dirname+ that match the specified +fileregex+. Be careful and make sure because the regex argument requires java style escaping of '\' and '\"'. For example '\w+' would have to be written as "\\w+". If a new file appears, it is added to the list of files to tail. If pointed at a new directory, it will attempt to read all files that match! If the +startFromEnd+ parameter is false, tail will re-read from the beginning of each file. If it is true, it will only start reading from the current end of each file. If the +recurseDepth+ parameter is > 0 then tailDir will recurse into sub-directories. The value defines max level of a directory below the +dirname+ to tail files in. Zero means do not recurse into sub-directories. Note: fileregex is applied to file names only (not including dirs), all directories (which fit +recurseDepth+ parameter) are recursed into. See the section on tailing a file for details on +delim+ and +delimMode+. +seqfile("_filename_")+ :: Read from a Hadoop sequence file formatted file, with +com.cloudera.flume.handlers.hdfs.WriteableEventKey+ and +com.cloudera.flume.handlers.hdfs.WriteableEvent+ values. Conveniently, this source can read files generated by the seqfile sink. +syslogUdp(_port_)+ :: Syslog over UDP +_port_+. This is syslog compatible. +syslogTcp(_port_)+ :: Syslog over TCP +_port_+. This is syslog-ng compatible. This is a server that can listen and receive on many concurrent connections. +syslogTcp1(_port_)+ :: Syslog over TCP +_port_+. This is syslog-ng compatible. This is only available for a single connection and then shuts down afterwards. +execPeriodic("_cmdline_", _ms_)+ :: Execute an arbitrary program specified by +_cmdline_+. The entire output of the execution becomes the body of generated messages. +_ms_+ specifies the number of milliseconds to wait before the next execution (and next event). Ideally the program is short lived. This does not process shell pipes or redirection operations -- for these write a script and use the script as the +_cmdline_+ argument. +execStream("_cmdline_")+ :: Execute an arbitrary program specified by +_cmdline_+. Each line outputted will become a new event. Ideally the program is long lived. This does not process shell pipes or redirection operations -- for these write a script and use the script as the +_cmdline_+ argument. +exec("_cmdline_"[, _aggregate_=false[, _restart_=false[,_period_=0]]])+ :: Execute an arbitrary program specified by +_cmdline_+. If the +_aggregate_+ argument is +true+ entire program output is considered an event; otherwise, each line is considered a new event. If the +_restart_+ argument is +true+, then the program is restarted after it exits after waiting for +_period_+ milliseconds. +execStream("foo")+ is equivalent to +exec("foo", false, false, 0)+. +execPeriodic("foo", 1000)+ is equivalent to +exec("foo", true, true, 1000)+ +synth(_msgCount_,_msgSize_)+ :: A source that synthetically generates +_msgCount_+ random messages of size +_msgSize_+. This will generate non printable characters. +synthrndsize(_msgCount_,_minSize_,_maxSize_)+ :: A source that synthetically generates +_msgCount_+ random messages of size between randomly _minSize_ and _maxSize_. This will generate non printable characters. +nonlsynth(_msgCount_,_msgSize_)+:: A source that synthetically generates +_msgCount_+ random messages of size +_msgSize_+. This converts all `'\n'` chars into `' '` chars. This will generate non-printable characters but since all randomly generated '\n' are converted, sources dependent on '\n' as a record separator can get uniformly sized data. +asciisynth(_msgCount_,_msgSize_)+:: A source that synthetically generates +_msgCount_+ random messages of size +_msgSize_+. This converts all `'\n'` chars into `' '` chars, and all non ASCII characters into printable ASCII characters. +twitter("_username_","_pw_"[,"_url_"])+ :: (Unsupported) A source that collects data from a twitter "spritzer" stream. +_username_+ is a twitter username, +_pw_+ is the password for the user, and +_url_+ is the url for the feed. If not specified, +http://stream.twitter.com/1/statuses/sample.json+ is used by default the +_url_+. See http://apiwiki.twitter.com/Streaming-API- Documentation for more details. +irc("_server_",_port_, "_nick_","_chan_")+ :: (Unsupported) An IRC channel source. Each line sent to the channel is a new event. It attempts to connect to +_server_+ on TCP port +_port_+ (standard is 6667). When it connects it attempts to take the nickname +_nick_+, and enter channel +_chan_+ (like +#hadoop+ ). +scribe[(+_port_+)] :: A scribe source. This provides a network socket that is compatible with data generated by Facebook's Scribe collection system. +report[(periodMillis)]+ :: This source polls the local physical node for its report every _periodMillis_ milliseconds and turns it into a new event. The attribute names seen from the node report page are present, and the values are uninterpreted arrays of bytes. === Flume Sinks Catalog .Flume's Collector Tier Event Sinks [horizontal] +collectorSink("_fsdir_","_fsfileprefix_"[, _rollmillis_[, _format_]])+ :: Collector sink. +_fsdir_+ is a fs directory URI such as +hdfs://namenode/path+ or +file:/// path+. +_fsfileprefix_+ is a file name prefix for outputted files. Both of these can use escape sequences documented to bucket data as documented in the *Output Bucketing* section. +_rollmillis_+ is the number of milliseconds between when a HDFS file should be rolled (opened and closed). The format for data outputted by collectors is specified by the +flume.collector.output.format+ property. .Flume's Agent Tier Event Sinks [horizontal] +agentSink[("_machine_"[, _port_])]+ :: Defaults to +agentE2ESink+ +agentE2ESink[("_machine_"[, _port_])]+ :: Agent sink with write ahead log and end-to-end ack. Optional arguments specify a +_machine_+, and the TCP +_port_+ pointing to a +collectorSource+. If none is specified, the values specified by the +flume.collector.event.host+ and the +flume.collector.port+ properties will be used. +agentDFOSink[("_machine_"[, _port_])]+ :: DiskFailover Agent sink that stores to local disk on detected failure. This sink periodically checks with the +_machine:port_+ and resends events if becomes alive again. Optional arguments specify a +_machine_+, and the TCP +_port_+ pointing to a +collectorSource+. If none is specified, the values specified by the +flume.collector.event.host+ and the +flume.collector.port+ properties will be used. +agentBESink[("_machine_"[, _port_])]+ :: BestEffort Agent sink. This drops messages on failures and continues sending. Optional arguments specify a +_collector_+, and the TCP +_PORT_+ pointing to a +collectorSource+. If none is specified, the values specified by the +flume.collector.event.host+ and the +flume.collector.port+ properties will be used. +agentE2EChain("_m1_[:_p1_]"[, "_m2_[:_p2_]"[,...]])+ :: Agent sink with write-ahead log and end-to-end ack and collector failover chains. +_m1:p1_+ specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, and since data is already durable locally, it will back off attempts to send down stream. Optional arguments specify a list of failover machine:port pairs in a ranked order. If a primary collector is not responding, the backups are used. The primary collectors are checked periodically to see if they have come back up. +agentDFOChain("_m1_[:_p1_]"[, "_m2_[:_p2_]"[,...]])+ :: DiskFailover Agent sink that first attempts to fail over to other collectors. +_m1:p1_+ specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, it will store to local disk. Optional arguments specify a list of failover machine:port pairs in a ranked order. If a primary collector is not responding, the backups are used. The primary collectors are checked periodically to see if they have come back up. +agentBEChain("_m1_[:_p1_]"[, "_m2_[:_p2_]"[,...]])+ :: BestEffort Agent sink with collector failover chains. +_m1:p1_+ specifies a machine and optional port of the primary default collector. If all failovers are exhausted due to failures, this drops messages. Optional arguments specify a +_collector_+, and the TCP +_port_+ of the collector. If none is specified, the values specified by the +flume.collector.event.host+ and the +flume.collector.port+ properties will be used. +autoE2EChain+ :: This sink is an +agentE2EChain+ that has failover nodes populated automatically by the master. +autoDFOChain+ :: This sink is an +agentDFOChain+ that has failover nodes populated automatically by the master. +autoBEChain+ :: This sink is an +agentBEChain+ that has failover nodes populated automatically by the master. .Flume's Logical Sinks +logicalSink("_logicalnode_")+ :: This sink creates an rpcSink that is assigned a host and IP based on the name of a logical node. This information is maintained and automatically selected by the master. .Flume's Basic Sinks [horizontal] +null+ :: Null sink. Events are dropped +console[(_format_)]+ :: Console sink. Display events to process's stdout using the optionally specified output formatter. +text("_txtfile_"[,_format_])+ :: Textfile sink. Write to text file txtfile, using an optionally specified +_formatter_+. If a file already exists, this sink will attempt to overwrite it. +seqfile("filename")+ :: Seqfile sink. Write to a Hadoop sequence file formatted file, with +com.cloudera.flume.handlers.hdfs.WriteableEventKey+ keys and +com.cloudera.flume.handlers.hdfs.WriteableEvent+ values. If a file already exists, this sink will attempt to overwrite it. +dfs("_hdfspath_")+ :: Hadoop dfs seqfile sink. Write to a dfs path in Flume- specific Hadoop seqfile record format. The +_hdfspath_+ can use escape sequences to bucket data as documented in the *Output Bucketing* section. +formatDfs("_hdfspath_"[, _format_])+ :: Hadoop dfs formatted file sink. The _hdfspath_ string is *not* escaped. The output format of writes to a dfs path in using specified output _format_. +escapedFormatDfs("_hdfspath_", "_file_"[, _format_]) :: Hadoop dfs formatted file sink. The _hdfspath_ string is escaped and events will get written to particular directories and filenames based on this string. The output format of writes to a dfs path in using specified output _format_. The +_hdfspath_+ can use escape sequences documented to bucket data as documented in the *Output Bucketing* section. +customdfs("_hdfspath_"[, _format_])+ :: Hadoop dfs formatted file sink. The _hdfspath_ string is *not* escaped. The output format of writes to a dfs path in using specified output _format_. This sink is being deprecated by +formatDfs+. +escapedCustomDfs("_hdfspath_", "_file_"[, _format_]) :: Hadoop dfs formatted file sink. The _hdfspath_ string is escaped and events will get written to particular directories and filenames based on this string. The output format of writes to a dfs path in using specified output _format_. The +_hdfspath_+ can use escape sequences documented to bucket data as documented in the *Output Bucketing* section. This sink is being deprecatd by +escapedFormatDfs+. +rpcSink("_host_"[, _port_])+ :: A remote procedure call (RPC) sink that is configured to send to machine +_host_+ on TCP port +_port_+. Default port is 35861 and can be overridden by setting the +flume.collector.event.port+ property. Supports both Apache-Thrift and Apache-Avro RPC framework. The type of RPC framework is specified by +event.rpc.type+ property (THRIFT or AVRO), the default is THRIFT. Note that same RPC framework is used for rpcSource. +syslogTcp("_host_"[,_port_])+ :: Syslog TCP sink. Write to host "host" on port "port" in syslog over TCP format (syslog-ng compatible). Default port is TCP 514. +irc("_host_",_port_, "_nick_", "_chan_")+ :: (Unsupported) An IRC channel sink. Each event is sent to the channel as a line. It attempts to connect to +_server_+ on TCP port +_port_+. When it connects it attempts to take the nickname +_nick_+, and enter channel +_chan_+ (like +#hadoop+). === Flume Sink Decorator Catalog .Flume's Sink Decorators [horizontal] +nullDeco+ :: This is a decorator that just passes data through to its child sink. +writeAhead(...)+ :: Write-ahead decorator. Provides durability by writing events to disk before sending them. This can be used as a buffering mechanism -- receive and send are decoupled in different threads. +ackedWriteAhead[(_maxmillis_)]+ :: Write-ahead decorator that adds acknowledgement tags and checksums to events. Provides durability by writing events to disk before sending them. This can be used as a buffering mechanism -- receive and send are decoupled in different threads. This generates and tracks groups of events, and also notifies other components to check for acknowledgements. These checks for retries are done where there is an exponential backoff that can top out at _maxmillis_ milliseconds. The default value for _maxmillis_ is +flume.agent.logdir.maxage+ property. +diskFailover[(_maxmillis_)]+ :: Disk failover decorator. Events that enter this decorator are sent to its sub sink. In the event of a down stream error, data is written to disk, and periodically these disk-buffered events are retried. These checks for retries are done where there is an exponential backoff that can top out at _maxmillis_ milliseconds. The default value for _maxmillis_ is +flume.agent.logdir.maxage+ property. +ackInjector+ :: This decorator injects an extra ack group start message on open, tags appended events with an ack tag, and injects an extra ack group end message. These tags contain a checksum, for all the bodies of the events that pass through the +ackInjector+. +ackChecker+ :: This decorator tracks ack group start, end, and checksum values inserted by +ackInjector+. If a group has arrived and its checksum is correct, it sends notifications to other components. +lazyOpen+ :: This decorator tracks open/closed state of the sub sink but does not actually open the sink until an append is called. Thus if a this decorator is opened and closed without any appends, the sub sink is never opened. +insistentOpen[(_max_[_init_[,_cumulativeMax_]],)]+ :: An insistent open attempts to open its subsink multiple times until it succeeds with the specified backoff properties. This is useful for starting a network client up when the network server may not yet be up. When an attempt to open the subsink fails, this exponentially backs off and then retries the open. _max_ is the max number of millis per backoff (default is Integer.MAX_VALUE). _init_ is the initial number of millis to back off on the first encountered failure (default 1000). _cumulativeMax_ is the maximum backoff allowed from a single failure before an exception is forwarded (default is Integer.MAX_VALUE). Note that this synchronously blocks the open call until the open succeeds or fails after _cumulativeMax_ millis. +stubbornAppend+ :: A stubborn append normally passes through append operations to its subsink. It catches the first exception that a subsink's append method triggers, and then closes, opens, and reappends to the subsink. If this second attempt fails, it throws an exception. This is useful in conjunction with network sinks where connections can be broken. The open/ close retry attempt is often sufficient to re-establish the connection. +value("_attr_","_value_"{,escape=true|false})+ :: The value decorator adds a new metadata attribute _attr_ with the value _value_. Agents can mark their data with specific tags for later demultiplexing. By default a value the user entered will be attached. By setting escape=true, the value will be interpreted and attempt to replace escape sequences with values from the event's attribute list. +mask("_attr1_"[,"_attr2_", ...])+ :: The mask decorator outputs inputted events that are modified so that all metadata *except* the attributes specified pass through. +select("_attr1_"[,"_attr2_", ...])+ :: The select decorator outputs inputted events that are modified so that *only* the metadata attributes specified pass through. +digest("_algorithm_","_attr_", base64="boolean")+ :: The digest decorator calculates a message digest of the event body and writes this value (as bytes) to the _attr_ attribute. The valid algorithms are those valid for +java.security.MessageDigest+ and includes MD5, SHA-1, SHA-256, SHA-384 and SHA-512. It can optionally base64 encode the digest value (defaults to false). +format("_pattern_")+ :: The format decorator outputs inputted events that are modified so that their bodies are replaced with an escaped version of the +_pattern_+ argument. Since checksums rely on body data, this should only be used on unreliable flows or reporting flows. Inappropriate use may result in message loss. +exDate("_attr_","_pattern_" [, "_prefix_" [, "_padding_"]])+ :: Parse a date string from the _attr_ value of the event, using the _pattern_ as a reference of how to parse it. The major values of the date will be assigned to new event attributes using "date" as a prefix if no _prefix_ is provided. To add or remove zero padding there is the _padding_ variable. By default zero padding is on. Example of zero padding 2010-1-1 becomes 2010-01-01. More info on how to construct the _pattern_ can be found here http://download-llnw.oracle.com/javase/1.4.2/docs/api/java/text/SimpleDateFormat.html Do note that _attr_ is an already extracted date string from the body of the event. exDate doesn't attempt to extract it from the body for you. Example output attributes would be dateday, datemonth, dateyear, datehr, datemin, datesec. Where _date_ is the _prefix_. +regex("_regex_",idx,"_attr_")+ :: The regex decorator applies the regular expression _regex_, extracts the _idx_ th capture group, and writes this value to the _attr_ attribute. The regex must use java-style escaping. Thus a regexs that want to use the +\d+ macro need to be specified as "\\d". regexAll("_regex_", "_name_" [, "_name_"]*)+ :: Applies the regular expression _regex_ to the event body, and assignes all pattern groups found to each provided _name_. +split("_regex_",idx,"_attr_")+ :: The split decorator uses the regular expression _regex_ to split the body into tokens (not including the splitter value). The _idx_ is then written as the value to the _attr_ attribute. The regex must use java-style escaping. Thus, a regex that wants to use the +\d+ macro must be specified as "\\d". +batch(_n_,_maxlatency_)+ :: buffers _n_ events and then sends one aggregate event. If _maxlatency_ millis have passed, all current buffered events are sent out as an aggregate event. +unbatch+ :: Unbatch takes an aggregate event generated by batch, splits it, and then forwards its original events. If an event is not an aggregate it is just forwarded. +gzip+ :: gzips a serialized event. This is useful when used in conjunction with aggregate events. +gunzip+ :: gunzip's a gzip'ed event. If the event is not a gzip event, it is just forwarded. +intervalSampler(_n_)+ :: Interval sampler. Every +_n_+ th event gets forwarded. +probSampler(_p_)+ :: Probability sampler. Every event has a probability _p_ (where 0.0 ≤ _p_ ≤ 1.0) chance of being forwarded. +reservoirSampler(_k_)+ :: Reservoir sampler. When flushed, at most _k_ events are forwarded. If more than _k_ elements have entered this decorator, exactly _k_ events are forwarded. All events that pass through have the same probability of being selected. NOTE: This will reorder the events being sent. +delay(_ms_)+ :: adds a _ms_ millisecond delay before forwarding events down the pipeline. This blocks and prevents other events from entering the pipeline. This is useful for workload simulation in conjunction with +asciisynth+ sources. +choke[(_choke-id_)]+ :: Limits the transfer rate of data going into the sink. The +choke-id+ should have been registered on the physical node where this decorator is being created using the +setChokeLimit+ command. Refer to *Limiting Data Transfer Rate between Source-Sink pairs* section for more details. include::Environment[] === flume-site.xml configuration settings include::Troubleshooting[]