//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// === Extending via Sink/Source/Decorator Plugins An experimental plugin mechanism is provided that allows you to add new custom sources, sinks, and decorators to the system. .Two steps are required to use this feature. . First, add the jar with the new plugin classes to flume's classpath. If the plugin requires DLL's/so's make sure these are in the LD_LIBRARY_PATH (unix .so) or PATH (windows .dll) . Second, in +flume-site.xml+, add the class names of the new sources, sinks, and/or decorators to the +flume.plugin.classes+ property. Multiple classes can be specified by comma separating the list. Java reflection is used to find some special static methods that add new components to the system and data flow language's library. An example component has been "pluginified" -- the "HelloWorld" source, sink, and decorator. This plugin does something very simple; the source generates the text "hello world!" every three seconds, the sink writes events to a "helloworld.txt" text file, and the decorator prepends "hello world!" to any event it encounters. . cd into the +plugins/helloworld+ directory and type +ant+, a +helloworld_plugin.jar+ file will be generated . Add the following to flume-site.xml (create it if it doesn't already exist) "helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator" to the +flume.plugin.classes+ property in +flume-site.xml+. + IMPORTANT: if you use the provided +flume-site.xml.template+ file to create your +flume-site.xml+ be sure to comment out or remove any example properties contained in the sample template. + .Example flume-site.xml contents [source,xml] ---- flume.plugin.classes helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator Comma separated list of plugins ---- . Start the Flume master and at least one logical node in separate terminals .. in each terminal cd into the top-level flume directory, should be just above +plugins+ .. Add +helloworld_plugin.jar+ to the FLUME_CLASSPATH in *both* terminals + ---- export FLUME_CLASSPATH=`pwd`/plugins/helloworld/helloworld_plugin.jar ---- .. in terminal 1 run +bin/flume master+ .. in terminal 2 run +bin/flume node -n hello1+ . At this point the master and hello1 nodes should be started and will have loaded the plugin + .You should see log output similar to the following in both master and hello1: ---- 10/07/29 17:35:28 INFO conf.SourceFactoryImpl: Found source builder helloWorldSource in helloworld.HelloWorldSource 10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink builder helloWorldSink in helloworld.HelloWorldSink 10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink decorator helloWorldDecorator in helloworld.HelloWorldDecorator ---- + TIP: Another way to verify that your plugin is loaded is to check if it is displayed on this page http://localhost:35871/masterext.jsp + . Configure hello1 + TIP: The easiest way to do this is open the configuration page of the master in a browser, typically this link http://localhost:35871/flumeconfig.jsp + .. load the helloworld source/sink into our +hello1+ node (the bottom text box, then submit button if you are using the master's web interface" + ---- hello1: helloWorldSource() | helloWorldSink(); ---- .. you could also try the hello world decorator + ---- hello1: helloWorldSource() | { helloWorldDecorator() => helloWorldSink() }; ---- + In either case +hello1+ will output a +helloworld.txt+ file into it's current working directory. Every 3 seconds a new "hello world!" line will be output to the file. ==== Semantics of Flume Extensions Flume uses threading to support reconfiguration and multiple logical nodes. Sources, sinks, and decorator are extensions that can use queues for buffering and can use clock sleeps for periodic rolls and retries. Because we use these blocking operations and because the internal concurrency control mechanisms we use can cause deadlocks or hangs, there are several rules that need to be followed and enforced by test cases in order to be a well-behaved source, sink, or decorator. NOTE: This section is a draft and not exhaustive or completely formal. More restrictions may be added to this in the future. .Semantics of sources Sources must implement 4 methods * void open() throws IOException * Event next() throws IOException * void close() throws IOException * ReportEvent getReport() Along with these signatures, each of these methods can also throw RuntimeExceptions. These exceptions indicate a failure condition and by default will make a logical node shutdown in ERROR state. These error messages are user visible and it is important that they have actionable descriptions about the failure conditions without having to dump stack. Thus, NullPointerExceptions are not acceptable -- they are not descriptive or helpful without their stack traces. Some examples of valid runtime exceptions include invalid arguments that prevent a source from opening and invalid state infractions (attempting to next on a closed source). ===== Simple source semantics. ["graphviz", "sourceStatesSimple.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> OPEN [label="open()"]; CLOSED -> CLOSED [label="close()"]; CLOSED -> CLOSED [label="next() throws Exn"]; OPEN -> OPEN [label="next()"]; OPEN -> OPEN [label="open() throws Exn"]; OPEN -> CLOSED [label="close()"]; } --------------------------------------------------------------------- Simple sources are assumed to be sources whose open and close operation happen quickly and do not block for long periods of time. The longest pauses tolerated here on the order of 10s (default time for a failing DNS lookup). The constructor for the sources should be callable without grabbing resources that can block or that require IO such as network connectors or file handles. If there are errors due to configuration settings that can be caught in the constructor, an IllegalArgumentException should be thrown. +open()+ is a call that grabs resources for the source so that the +next()+ call can be made. The +open+ call of a simple source should attempt to fail fast. It can throw a IOException or a RuntimeException such as IllegalStateException. Open should only be called on a CLOSED sink -- if a sink is opened twice, the second call should throw an IOException or IllegalStateException. +next()+ is a call that gets an event from an open source. If a source is not open, this should throw an IllegalStateException. This call can and will often block. If +next()+ is blocking, a call to +close()+ should unblock the +next()+ by having it exit cleanly returning null. Many resources such as TCP-sockets (and ideally rpc frameworks) default to throwing an exception on blocked network reads (like a +next()+ call) when +close()+'d. +close()+ is a call that releases resources that the open call of a source grabs. The +close()+ call itself blocks until all resources are released. This allows a subsequent +open()+ in the same thread to not fail due to resource contention (ex, closing a server socket on port 12345 should not return until port 12345 is ready to be bound again). +getReport()+ returns a ReportEvent. These values should be available regardless if the node is open or closed and this call should not block the other source by other calls (due to potential lock inversion issues). The values retrieved are ideally atomically grabbed but this is not required as long as no errors are caused by racy execution. If a source is opened or closed multiple times, it is up to the source to determine if values are reset or persisted between open/close cycles. ===== Buffered source semantics ["graphviz", "sourceStatesBuffered.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> CLOSED [label="close()"]; CLOSED -> CLOSED [label="next() throws Exn"]; CLOSED -> OPEN [label="open()"]; OPEN -> OPEN [label="next()"]; OPEN -> OPEN [label="open() throws Exn"]; OPEN -> CLOSING [label="close()"]; CLOSING-> CLOSING [label="next()"]; CLOSING-> CLOSING [label="close()"]; CLOSING-> CLOSING [label="open() blocks"]; CLOSING-> CLOSED [label="flushed"]; CLOSING-> CLOSED [label="failed"]; } --------------------------------------------------------------------- Some sources have in memory queues or buffers stored persistently on disk. The guiding principle here is that on +close()+, buffered sources should prevent new data from entering and attempt to flush the buffered data. Also any subordinate threads should be released before close returns. If no progress is being made on +close()+, for a given period of time (30s is the default currently) the controlling thread will for an Thread.interrupt() call. The source should be able to handle InterruptedExceptions and percolate interrupted status up the returning call stack. NOTE: In v0.9.1 and v0.9.2, interruptions when caught should be handled by re-flagging the Thread's interrupted flag (a call to +Thread.currentThread().interrupt()+) and then throwing an IOException. The API for extensions will likely change in the future to throw either an IOException or an InterruptedException. +open()+ no change. +close()+ This call is usually made from a separate thread than the open() or next() calls. Since this call should block until resources are freed, it should attempt to flush its buffers before returning. For example, so if a network source has some buffered data, the network connection should be closed to prevent new data from entering, and then the buffered data should be flushed. The source should be able to handle InterruptedExceptions and percolate interrupted status up the returning call stack and indicate an error state. A +next()+ call while in CLOSING state should continue pulling values out of the buffer until it is empty. This is especially true if the next() call is happening in the main driver thread or a subordinate driver thread. One mechanism to get this effect is to add a special DONE event to the queue/buffer that indicates a clean exit. +getReport()+ ideally includes metric information like the size of the queues in the source, and the number of elements in the queue. .Semantics of sinks and decorators Sinks and decorators must implement 4 methods * void open() throws IOException * void append(Event e) throws IOException * void close() throws IOException * ReportEvent getReport() Along with these signatures, each of these methods can also throw RuntimeExceptions. Run-time exceptions indicate a failure condition and by default will make a logical node shutdown in ERROR state. These error messages are user visible and it is important that they have helpful descriptions about the failure conditions without having to dump stack. Thus, NullPointerExceptions are not acceptable -- they are not descriptive or helpful without their stack traces. ===== Simple Sinks. ["graphviz", "sinkStatesSimple.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> OPEN [label="open()"]; CLOSED -> CLOSED [label="close()"]; CLOSED -> CLOSED [label="append() throws Exn"]; OPEN -> OPEN [label="append()"]; OPEN -> CLOSED [label="close()"]; OPEN -> OPEN [label="open() throws Exn"]; } --------------------------------------------------------------------- Simple sinks are assumed to have open, close, and append operations that happen quickly and do not block for long periods of time. The longest pauses tolerated here on the order of 10s (default time for a failing DNS lookup). The constructor for the sinks and decorators should be callable without grabbing resources that can block or that require IO such as network connectors or file handles. If there are errors due to configuration settings that can be caught in the constructor, an IllegalArgumentException should be thrown. +open()+ is a call that grabs resources for the sink or decorator so that the +append(Event)+ call can be made. If there are errors due to configuration settings not detectable without IO in the constructor, +open()+ should attempt to fail fast and throw a IOException or RuntimeException such as IllegalStateException or IllegalArgumentException. Open should only be called on a CLOSED sink -- if a sink is opened twice, the second call should throw an IOException or IllegalStateException. +append()+ is a call that delivers a event. If a sink is not open, this should throw an IllegalStateException. If a normal decorator fails to open or to append because of an internal failure or a subsink fails to open, the decorator should release its resources attempt to close the subsink and then throw an exception. There are some sink/decos that specifically manipulate these semantics -- this needs to be done with care. +close()+ is a call that releases resources that the open call of a source grabs. If +open()+ or +next()+ is blocking, a call to this should unblock the call and have them exit. +close()+ should be called called on an open sink, but we allow a closed sink to have +close()+ called on it without throwing an exception (generally LOG warning however). +getReport()+ returns a ReportEvent. These values should be available regardless if the node is open or closed and this call should not cause get blocked by other calls (due to potential lock inversion issues). The values retrieved are ideally atomically grabbed but this is not required as long as no errors are caused by racy execution. If a sink is opened or closed multiple times, it is up to the sink to determine if values are reset or persisted between open/close cycles. ===== Buffered sink and decorator semantics ["graphviz", "sinkStatesBuffered.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> CLOSED [label="close()"]; CLOSED -> OPEN [label="open()"]; CLOSED -> CLOSED [label="append() throws Exn"]; OPEN -> OPEN [label="append()"]; OPEN -> OPEN [label="open() throw Exn"]; OPEN -> CLOSING [label="close()"]; CLOSING -> CLOSING [label="close()"]; CLOSING -> CLOSING [label="open() blocks"]; CLOSING -> CLOSING [label="append() throws Exn"]; CLOSING-> CLOSED [label="success"]; CLOSING-> CLOSED [label="failure"]; } --------------------------------------------------------------------- Some sinks have queues or buffers stored in memory or persistently on disk. The guiding principle here is that buffered sinks should attempt to flush its buffers when prompted to +close()+. This needs to be balanced with the requirement that sinks and decorated sinks should attempt to close in a relatively quick fashion. +open()+ Since this sink isn't open this generally means there is no buffered data. However, an +open()+ on a sink or decorator with persistent data should attempt to recover data and enqueue it in the +open()+ call. Examples of these include a DFO or WAL log recovering dfo/wla logs, when a network subsink is down. An +append()+ call may buffer data before sending it (such as a batching decorator). A +close()+ call, should attempt to append buffered data to its (sub)sink before executing the close. Also, any subordinate threads should be stopped before shutdown. If no progress is being made on +close()+, for a given period of time (30s is the default currently) it will be interrupted and should handle abruptly exiting because of this interruption. ===== Retries, sleeps, and unclean exits. ["graphviz", "sinkStatesOpening.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> CLOSED [label="close()"]; CLOSED -> OPENING [label="open()"]; OPENING -> CLOSED [label="close()"]; OPENING -> OPEN [label="success"]; OPENING -> CLOSED [label="interrupt"]; OPENING -> OPENING [label="append() blocks"]; OPEN -> OPEN [label="append()"]; OPEN -> OPEN [label="open() throws Exn"]; OPEN -> CLOSED [label="interrupt"]; OPEN -> CLOSED [label="close()"]; } --------------------------------------------------------------------- Retry on open semantics ["graphviz", "sinkStatesAppending.png"] --------------------------------------------------------------------- digraph single_collector { rankdir=LR; START -> CLOSED ; CLOSED -> CLOSED [label="close()"]; CLOSED -> OPEN [label="open()"]; CLOSED -> CLOSED [label="append() throws Exn"]; OPEN -> APPENDING [label="append()"]; APPENDING -> OPEN [label="success"]; APPENDING -> APPENDING [label="open()"]; APPENDING -> APPENDING [label="close() blocks"]; APPENDING -> APPENDING [label="append() blocks"]; OPEN -> OPEN [label="open() throws Exn"]; OPEN -> CLOSED [label="close()"]; OPEN -> CLOSED [label="interrupt"]; APPENDING -> CLOSED [label="interrupt"]; } --------------------------------------------------------------------- Retry on Append semantics Some decorators introduce retries and sleeps. An author who uses these needs to ensure that these operations behave well on open, append, and close. When combined with buffering sinks, flushing a buffer on close may not be possible! (ex: wal trying to send data to a dead network connection). This means these sinks/decos need a way to exit abruptly and report an error. There are two operations that make these cases need to be handled: unbounded retries and unbounded/long sleeps/await/blocking. Some decorators have potentially unbounded retry semantics. For example, InsistentOpen, InsistentAppend, and FailoverSinks have the potential to retry +open()+ and +append()+ calls an unbounded number of times. These decorators can also be wrapped with in others -- this means we need to be able to percolate the hard exit and bypass the retry logic. To do this we require that any sink/deco that has retry logic must check for hard exit before retrying. These sinks must to propagate the hard exit interruption to its upstream decorators (in case they have retry logic!). Some sinks have the potential to backoff or sleep for long or potentially unbounded amounts of time. Code using sleeps or synchronization operations such as waiting on latches (+CountDownLatch.await()+) or thread sleeps (+Thread.sleep()+) must properly handle interruptions. Since these yielding operations are usually only used in retry operations (which meant there was a failure), the sink/deco needs to propagate the interruption and fail with error. There are some ramifications of these semantics. Care must be taken with locking open, close, and append operations. If there are any sleeps or blocking +open()+ operations (ex: InsistentOpen, FailoverSinks), ideally a close call will cause it to shutdown, it and the open call should get unblocked. +append()+ The sink signaled to be closed but blocked on +append()+ or +open()+ should exit in a reasonable amount of time -- ideally within a few heartbeats (5s is the default, so ideally <30s). If the sink exits and its buffers are empty, it should do a normal successful return. If there were unflushed events, it should return error by throwing an exception. If there are subordinate threads, these should be terminated before close returns. // NOTE: If you want more information about writing your own plugins, // please contact flume-dev@cloudera.org. //////// HIDDEN FOR NOW, THIS IS HALF BAKED === Ganglia Reporting First we need two nodes -- in this example they are run on the same machine. ---- Terminal 1: $ bin/flume node Terminal 2: $ bin/flume node -n localhost ---- We assume the node is called _host_. Next let's map a new logical node called _report_ to localhost. ---- map localhost report ---- Finally lets set the configuration of the three nodes (replacing _host_ with the actual host name). ---- host : asciisynth(0,100) | { delay(250) => agentSink } ; report : reportPoller | [ console, dganglia("LogicalNodeManager._node_.LogicalNode.Agent.NaiveFileWAL.appendSucce ss", "msgs", "long") ]; localhost : collectorSource | collectorSink("file:///tmp/test/%H","blah",5000); ---- This setup synthesize random ascii data events, and sends one event 250s through the agentSink pipeline. These are received by the +localhost+ node which write data into hourly directories. Meanwhile, the +report+ node on +_host_+ is a separate logical node that periodically polls the physical node for reports and sends them to the console and a +dganglia+ sink. The +dganglia+ sends the deltas (rate information) between particular values to the ganglia gmond daemon. ==== Experimental Sources and Sinks .Flume's Plugin Sources [horizontal] +amqpPoll(_..._)+ :: AMPQ (Advanced Message Queuing Protocol) based on polling +amqpSub(_..._)+ :: AMPQ (Advanced Message Queuing Protocol) based on subscribing +pcapfile("_pcap file_")+ :: One shot pcap file source (packet capture ala wireshark). One packet is one event +pcap("_filterspec_")+ :: Live filtered pcap source (package capture ala wireshark). One packet is one event. .Flume's Plugin Sinks +ampq()+ :: AMQP sink. Write to specified AMQP broker (this is not present.) NOTE: Graphing currently uses Google charts API and requires the browser to external network connectivity .Flume's Reporter Sinks (Experimental): [horizontal] +ganglia("_attr_","_units_","_type_"[,"_gmondservers_"])+ :: This sink reads absolute value of the attribute +_attr_+ as a type +_type_+ where +_type_+ may be +int+, +long+, +double+ or +string+. One may optionally specify a list of gmond servers to send metric data to. +dganglia("_attr_","_units_","_type_"[,"_gmondservers_"])+ :: This sink reads the attribute +_attr_+ as a type +_type_+ where +_type_+ may be +int+, +long+, or +double+ and then emits the delta of the previous value. One may optionally specify a list of gmond servers to send metric data to. +counter("_name_")+ :: counts number of events received, and registers report to report manager as +_name_+. +counterHistory("name", .... )+ :: generates historical count versus time graph +multigrep(...)+ :: makes histogram that counts number of events that matches substring grep +multigrepspec(...)+ :: makes histogram that counts number of events that match substring grep (uses config file instead of inline)| +regexhisto(...)+ :: uses regex and builds histogram based on matching group's value +regexhisto(...)+ :: uses regex and builds histogram based on matching group's value (uses config file instead of inline) .Flume's Experimental/Debugging Sink Decorators (unsupported) [horizontal] +flakeyAppend(_p_)+ :: each append has a probability +_p_+ (where 0.0 ≤ _p_ ≤ 1.0) of failing and throwing an exception. +intervalFlakeyAppend(_n_)+ :: every +_n_+ th element fails by throwing an exception +intervalDroppyAppend(_n_)+ :: every +_n_+ th element fails by throwing an exception +inmem+ :: buffers events in memory and then flushes all on close. Good for benchmarking. +benchinject+ :: injects an extra benchmark begin event on open and a benchmark end event on close +benchreport("_name_")+ :: starts a benchmark when it receives a benchmark begin event and stops a benchmark when it receives a corresponding benchmark end event. Benchmark events are not forwarded, but all other events are. +mult(_n_)+ :: takes each event and sends it _n_ copies of it before sending the next event. // unused, easier to just remove. // +latch(...)+ :: (Unsupported) adds a latch that blocks until // triggered. This is only really useful for debugging and test cases. ===== Deprecated .Flume's Deprecated Sources These sources are "too specific" or perform poorly and are thus deprecated (but still available). They will likely be replaced with a more generic mechanisms (similar to the output format mechanism). .Flume's Experimental Sources [horizontal] +tpriosource(_port_)+ :: Thrift RPC base source on TCP port, port, prioritized by priority (higher priority first) and then age (older messages first) +thriftSource(_port_)+ :: Same as RPC but specifically thrift RPC .Flume's Deprecated Sinks Many of these sinks are left over from earlier iterations. [horizontal] *None listed here for now* END OF HIDDEN FOR NOW HALF BAKED /////////////////