.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ========================================== Flume 1.2.0 Developer Guide ========================================== Introduction ============ Overview -------- Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. Apache Flume is a top level project at the Apache Software Foundation. There are currently two release code lines available, versions 0.9.x and 1.x. This documentation applies to the 1.x codeline. Please click here for `the Flume 0.9.x Developer Guide `_. Architecture ------------ Data flow model ~~~~~~~~~~~~~~~ A unit of data flow is called event which is a byte payload that is accompanied by an optional set of string attributes. Flume agent is a process (JVM) that hosts the components that flows events from an external source to next destination. .. figure:: images/DevGuide_image00.png :align: center :alt: Agent component diagram A source consumes events delivered to it by an external source like web server in a specific format. For example, an Avro source can be used to receive Avro events from clients or other agents in the flow. When a source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until its consumed by a sink. An example of channel is the JDBC channel that uses a file-system backed embedded database. The sink removes the event from channel and puts it into an external repository like HDFS or forwards it to the source in next hop of the flow. The source and sink within the given agent run asynchronously with the events staged in the channel. Reliability ~~~~~~~~~~~ The events are staged in the channel on each agent. Then they are delivered to the next agent or terminal repository (like HDFS) in the flow. The events are removed from the channel only after they are stored in the channel of next agent or in the terminal repository. This is a how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flowFlume uses transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate the store/retrieval of the events in a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In case of multi hop flow, the sink on previous hop and source on next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop. Building Flume -------------- Getting the source ~~~~~~~~~~~~~~~~~~ Check out the code using Subversion. Click here for `the SVN repository root `_. The Flume 1.x development happens under the branch "trunk" so this command line can be used:: svn checkout http://svn.apache.org/repos/asf/flume/trunk flume-trunk Alternatively, if you prefer using Git, you may use:: git clone git://git.apache.org/flume.git cd flume git checkout trunk Compile/test Flume ~~~~~~~~~~~~~~~~~~ The Flume build is mavenized. You can compile Flume using the standard Maven commands: #. Compile only: ``mvn clean compile`` #. Compile and run unit tests: ``mvn clean test`` #. Run individual test(s): ``mvn clean test -Dtest=,,... -DfailIfNoTests=false`` #. Create tarball package: ``mvn clean install`` #. Create tarball package (skip unit tests): ``mvn clean install -DskipTests`` Developing custom components ---------------------------- Client ~~~~~~ The client operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from. Currently flume supports Avro, log4j and syslog as ways to transfer data from remote source. Additionally there’s an Exec source that can consume the output of a local process as input to Flume. It’s quite possible to have a use case where these existing options are not sufficient. In this case you can build a custom mechanism to send data to Flume. There are two ways of achieving this. First is to create a custom client that communicates to one of the flume’s existing sources like Avro or syslog. Here the client should convert it’s data into messages understood by these Flume sources. The other option is to write a custom Flume source that directly talks to your existing client application using some IPC or RPC protocols, and then convert the data into flume events to send it upstream. Client SDK '''''''''' Though flume contains a number of built in mechanisms to ingest data, often one wants the ability to communicate with flume directly from a custom application. The Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC. RPC Client interface '''''''''''''''''''' The is an interface to wrap the user data data and attributes into an ``Event``, which is Flume’s unit of flow. This encapsulates the RPC mechanism supported by Flume. The application can simply call ``append()`` or ``appendBatch()`` to send data and not worry about the underlying message exchanges. Avro RPC Client ''''''''''''''' As of Flume 1.1.0, Avro is the only support RPC protocol. The ``NettyAvroRpcClient`` implements the ``RpcClient`` interface. The client needs to create this object with the host and port of the Flume agent and use it to send data into flume. The following example shows how to use the Client SDK API: .. code-block:: java import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; public void myInit () { // setup the RPC connection to Flume agent at hostname/port RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port); ... } public void sendDataToFlume(String data) { // Create flume event object Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); try { rpcClient.append(event); } catch (EventDeliveryException e) { // clean up and recreate rpcClient rpcClient.close(); rpcClient = null; rpcClient = RpcClientFactory.getDefaultInstance(hostname, port); } ... } public void cleanUp () { // close the rpc connection rpcClient.close(); ... } Failover handler '''''''''''''''' This class wraps the Avro RPC client to provide failover handling capability to clients. This takes a list of host/ports of the Flume agent. If there’s an error in communicating the current agent, then it automatically falls back to the next agent in the list: .. code-block:: java // Setup properties for the failover Properties props = new Properties(); props.put("client.type", "default_failover"); // list of hosts props.put("hosts", "host1 host2 host3"); // address/port pair for each host props.put("hosts.host1", host1 + ":" + port1); props.put("hosts.host1", host2 + ":" + port2); props.put("hosts.host1", host3 + ":" + port3); // create the client with failover properties client = (FailoverRpcClient); RpcClientFactory.getInstance(props); Transaction interface ~~~~~~~~~~~~~~~~~~~~~ The ``Transaction`` interface is the basis of reliability for Flume. All the major components ie. sources, sinks and channels needs to interface with Flume transaction. .. figure:: images/DevGuide_image01.png :align: center :alt: Transaction sequence diagram The transaction interface is implemented by a channel implementation. The source and sink connected to channel obtain a transaction object. The sources actually use a channel selector interface that encapsulate the transaction (discussed in later sections). The operations to stage or extract an event is done inside an active transaction. For example: .. code-block:: java Channel ch = ... Transaction tx = ch.getTransaction(); try { tx.begin(); ... // ch.put(event) or ch.take() ... tx.commit(); } catch (ChannelException ex) { tx.rollback(); ... } finally { tx.close(); } Here we get hold of a transaction from a channel. After the begin method is executed, the event is put in the channel and transaction is committed. Sink ~~~~ The purpose of a sink to extract events from the channel and forward it to the next Agent in the flow or store in an external repository. A sink is linked to a channel instance as per the flow configuration. There’s a sink runner thread that’s get created for every configured sink which manages the sink’s lifecycle. The sink needs to implement ``start()`` and ``stop()`` methods that are part of the ``LifecycleAware`` interface. The ``start()`` method should initialize the sink and bring it to a state where it can forward the events to its next destination. The ``process()`` method from the ``Sink`` interface should do the core processing of extracting the event from channel and forwarding it. The ``stop()`` method should do the necessary cleanup. The sink also needs to implement a ``Configurable`` interface for processing its own configuration settings: .. code-block:: java // foo sink public class FooSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { some_Param = context.get("some_param", String.class); // process some_param … } @Override public void start() { // initialize the connection to foo repository .. } @Override public void stop () { // cleanup and disconnect from foo repository .. } @Override public Status process() throws EventDeliveryException { // Start transaction ch = getChannel(); tx = ch.getTransaction(); try { tx.begin(); Event e = ch.take(); // send the event to foo // foo.some_operation(e); tx.commit(); sgtatus = Status.READY; (ChannelException e) { tx.rollback(); status = Status.BACKOFF; } finally { tx.close(); } return status; } } } Source ~~~~~~ The purpose of a Source is to receive data from an external client and store it in the channel. As mentioned above, for sources the ``Transaction`` interface is encapsulated by the ``ChannelSelector``. Similar to ``SinkRunner``, there’s a ``SourceRunner`` thread that gets created for every configured source that manages the source’s lifecycle. The source needs to implement ``start()`` and ``stop()`` methods that are part of the ``LifecycleAware`` interface. There are two types of sources, pollable and event-driven. The runner of pollable source runner invokes a ``process()`` method from the pollable source. The ``process()`` method should check for new data and store it in the channel. The event driver source needs have its own callback mechanism that captures the new data: .. code-block:: java // bar source public class BarSource extends AbstractSource implements Configurable, EventDrivenSource{ @Override public void configure(Context context) { some_Param = context.get("some_param", String.class); // process some_param … } @Override public void start() { // initialize the connection to bar client .. } @Override public void stop () { // cleanup and disconnect from bar client .. } @Override public Status process() throws EventDeliveryException { try { // receive new data Event e = get_some_data(); // store the event to underlying channels(s) getChannelProcessor().processEvent(e) } catch (ChannelException ex) { return Status.BACKOFF; } return Status.READY; } } Channel ~~~~~~~ TBD