//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Advanced Flume Usage This section describes in further detail, how to automate the control of Flume nodes via the FlumeShell, a deep dive into Flume's dataflow specification language, internals of the reliability mechanisms, how to do metadata manipulations, and how to install source and sink plugins. include::FlumeShell[] === Flume's Dataflow Specification Language Using the Flume node roles (collector, agent) is the simplest method to get up and running with Flume. Under the covers, these sources and sinks (collectorSink, collectorSource, and agentSource) are actually composed of primitive sinks that have been augmented with *translated components* with role defaults, *special sinks* and *sink decorators*. These components make configuration more flexible, but also make configuration more complicated. The combination of special sinks and decorators expose a lot of details of the underlying mechanisms but are a powerful and expressive way to encode rich behavior. Flume enables users to enter their own composition of sinks, sources, and decorators by using a domain specific data flow language. The following sections will describe this in more details. ------------------------------------------------------------------------------------ nodeName ::= NodeId simpleSource ::= SourceId args? simpleSink ::= SinkId args? decoratorSink ::= DecoId args? source ::= simpleSource sink ::= simpleSink // single sink | [ sink (, sink)* ] // fanout sink | { decoratorSink => sink } // decorator sink | decoratorSink1 decoratorSink2 ... decoratorSinkN sink // decorator sink | < sink ? sink > // failover / choice sink | roll(...) { sink } // roll sink | collector(...) { sink } // generic collector sink logicalNode ::= NodeId : source | sink ; spec ::= (logicalNode)* ------------------------------------------------------------------------------------ ==== Special Sinks: Fan out, Fail over, and Roll Three special sinks are FanOutSinks, FailoverSinks, and RollSinks. Fanout sinks send any incoming events to all of the sinks specified to be its children. These can be used for data replication or for processing data off of the main reliable data flow path. Fanout is similar to the Unix +tee+ command or logically acts like an AND operator where the event is sent to each subsink. The syntax for a FanoutSink is : ---- [ console, collectorSink ] ---- FailoverSinks are used to handle failures when appending new events. FailoverSinks can be used to specify alternate collectors to contact in the event the primary collector fails, or a local disk sink to store data until the primary collector recovers. Failover is similar to exception handling and logically acts like an OR operator. If a failover is successful, one of the subsinks has received the event. The syntax for a FailoverSink is : ---- < logicalSink("collector1") ? logicalSink("collector2") > ---- So, you could configure node "agent1" to have a failover to collector2 if collector1 fails (for example, if the connection to `collector1` goes down or if `collector1`'s HDFS becomes full): ---- agent1 : source | < logicalSink("collector1") ? logicalSink("collector2") > ; ---- Roll sink opens and closes a new instance of its subsink every _millis_ milliseconds. A roll is an atomic transition that closes the current instance of the sub-sink, and then opens a new instance of it for appending data. A special +%{rolltag}+ attribute is available as an escape sequence to differentiate data in different roll periods. This can be used to make every roll produce a new file with a unique name. The syntax for a roll sink is: ---- roll(millis) sink ---- These can be composed to have even richer behavior. For example, this sink outputs to the console and has a failover collector node. ---- [ console, < logicalSink("collector1") ? logicalSink("collector2") > ] ---- This one rolls the collector every 1000 milliseconds writing to a different HDFS file after each roll. ---- roll(1000) [ console, escapedCustomDfs("hdfs://namenode/flume/file-%{rolltag}") ] ---- ==== Introducing Sink Decorators Fan out and failover affect where messages go in the system but do not modify the messages themselves. To augment or filter events as they pass through the dataflow, you can use *sink decorators*. Sink decorators can add properties to the sink and can modify the data streams that pass through them. For example, you can use them to increase reliability via write ahead logging, increase network throughput via batching/compression, sampling, benchmarking, and even lightweight analytics. The following simple sampling example uses an intervalSampler which is configured to send every 10th element from source "source" to the sink "sink": ---- flumenode: source | intervalSampler(10) sink; ---- NOTE: There is an older more verbose syntax for specifiying decorators that wraps decorators and sinks with +{ deco => sink }+ syntax. The old syntax remains valid. Here's an example that batches every 100 events together. ---- flumenode: source | batch(100) sink; ---- Like fanout and failover, decorators are also composable. Here is an example that creates batches of 100 events and then compresses them before moving them off to the sink: ---- flumenode: source | batch(100) gzip sink; ---- As of v0.9.3, a new more generic collector sink that better encapsulates the "collectorMagic" functionality. This allows you to specify arbitrary sinks (or mulitple sinks) using the flume specification language. So the equivalent of ---- collectorSink("xxx","yyy-%{rolltag}", 15000) ---- is now: ---- collector(15000) { escapedCustomDfs("xxx","yyy-%{rolltag}") } ---- Here is an example of newly expressable functionality: ---- collector(15000) { [ escapedCustomDfs("xxx","yyy-%{rolltag}"), hbase("aaa", "bbb-%{rolltag}"), elasticSearch("eeee","ffff") ] } ---- This would send data to all three destinations, and only send an ack if all three were successful. Combined with some recovery decorators (stubborn*, insistent*), this can be used to precisely express potentially complex failure recovery policies. ==== Translations of High-level Sources and Sinks Internally, Flume translates sinks into compositions of other simpler decorators, failovers, rollers to add properties. The proper compositions create pipelines that provide different levels of reliability in Flume. NOTE: This section describes how agents work, but this version of Flume does not expose the translations the same way the auto*Chains are exposed. A future version of Flume will expose these details. The exact translations are still under development. Suppose you have nodes using the different agent sinks: ---- node1 : tail("foo") | agentE2ESink("bar"); node2 : tail("foo") | agentDFOSink("bar"); node3 : tail("foo") | agentBESink("bar"); ---- In the translation phases, agentE2ESink is actually converted into these Flume sinks: ---- node1 : tail("foo") | ackedWriteAhead lazyOpen stubbornAppend logicalSink("bar") ; node2 : tail("foo") | < lazyOpen stubbornAppend logicalSink("bar") ? diskFailover insistentOpen lazyOpen stubbornAppend logicalSink("bar") >; node3 : tail("foo") | lazyOpen stubbornAppend logicalSink("bar"); ---- +ackedWriteAhead+ is actually a complicated decorator that internally uses rolls and some other special decorators. This decorator interface allows us to manually specify wire batching and compression options. For example, you could compress every 100 messages using gzip compression: ---- node1 : tail("foo") | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink("bar"); ---- +collectorSink("xxx","yyy",15000)+ is also a bit complicated with some custom decorators to handle acks. Under the covers however, it depends on a roller with a escapedCustomDfsSink inside of it. ---- roll(15000) { collectorMagic escapedCustomDfs("xxx", "yyy-%{rolltag}") } ---- Another place translations happen is with logical nodes. Lets start of with a few nodes: ---- node1 : tail("foo") | { .... => logicalSink("node2") }; node2 : logicalSource | collectorSink("..."); ---- The translation mechanisms converts logicalSources and logicalSinks into lower level physical rpcSources and rpcSinks. Lets assume that node1 is on machine host1 and node2 is on machine host2. After the translation you end up with the following configurations: ---- node1 : tail("foo") | { .... => rpcSink("host2",12345) }; node2 : rpcSource(12345) | collectorSink("...""); ---- Suppose that you swap the mapping so that node2 is now on host1 and node1 is on host2. ---- # flume shell commands exec unmapAll exec map host1 node2 exec map host2 node1 ---- The original configuration will now be translated to: ---- node1 : tail("foo") | { .... => rpcSink("host1",12345) }; node2 : rpcSource(12345) | collectorSink("...""); ---- === Custom Metadata Extraction While Flume can take in raw data, you can add structure to data based on the nodes they flowed through, and filter out portions of data to minimize the amount of raw data needed if only a portion is needed. The simplest is the +value+ decorator. This takes two arguments: an attribute name, and a value to add to the event's metadata. This can be used to annotate constant information about the source, or arbitrary data to a particular node. It could also be used to naively track the provenance of data through Flume. ==== Extractors Extraction operations are also available to extract values from logs that have known structure. One example is the +regex+ decorator. This takes three arguments: a regular expression, an index, and an attribute name which allows the user to use a extract a particular regex group out of the body of the event and write it as the value of the specified attribute. Similarly the +split+ decorator also takes three arguments: a regular expression, an index, and an attribute name. This splits the body based on the regular expression, extracts the text group after the instance of the separator, and writes the value to the specified attribute. One can think of this as a much simplified version of the +awk+ Unix utility. ==== Meta Data Filtering and Transformations Flume enforces an invariant that prevents the modification of an attribute that has already been written upstream. This simplifies debugging of dataflows, and improves the visibility of data when debugging. If many stages are used, however, frequently a lot of extra metadata ends up being moved around. To deal with this, a few extra operations are available. A +select+ operation is available. This operation is like SQL select, which provides a relational calculus's set projection operation that modifies an event so that the specified metadata fields are forwarded. // TODO Example. A +mask+ operation is also available that forwards all metadata attributes except for the attributes specified. // TODO Example. A +format+ decorator is also available that uses the escape mechanisms to rewrite the body of an event to a user customizable message. This is useful for outputting summary data to low volume sources. Example: writing summary information out to an IRC channel periodically. ==== Role Defaults To simplify things for users, you can assign a particular role to a logical node -- think of these as an "automatic" specification that have default settings. Two roles currently provided are the agent role and the collector role. Since there are many nodes with the same role, each of these are called a tier. So, for example, the agent tier consists of all the nodes that are in the agent role. Nodes that have the collector role are in the collector tier. Agents and collectors roles have defaults specified in the `conf/flume-site.xml` file. Look at the `conf/flume-conf.xml` file for properties prefixed with `flume.agent.*` and `flume.collector.*` for descriptions of the configuration options. Each node maintains it own node state and has its own configuration. If the master does not have a data flow configuration for the logical node, the logical node will remain in IDLE state. If a configuration is present, the logical node will attempt to instantiate the data flow and have it work concurrently with other data flows. This means that each machine essentially only lives in one tier. In a more complicated setup, it is possible to have a machine that contains many logical nodes, and because each of these nodes can take on different roles, the machine lives in multiple tiers. ==== Arbitrary Data Flows and Custom Architectures With thriftSink and thriftSource, data can be sent through multiple nodes. If ack injection and ack checking decorators are properly inserted, you can achieve reliability. include::Plugins[] include::RateLimit[]