////////////////////
Licensed to Cloudera, Inc. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Cloudera, Inc. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////////////////////
== Using Data Collected by Flume
The first goal of Flume is to collect data and reliably write it to HDFS.
Once data arrives, one wants the ability to control where and in what format
data is stored. Flume provides basic output control mechanisms via the
properties configuration and in the dataflow language. This gives the user
the ability to control the output format and output bucketing of incoming
data, and simplifies integration with other HDFS data consumers such as Hive
and HBase.
Here are some example use cases:
* When monitoring a web server, you want to bucket logs based on time,
the page hit, and the browser being used.
* When tracking particular data nodes, you want to bucket logs based on
time and the data node name.
* When tracking a feed of JIRA tickets from the Apache feed, you want
to group based on the project identifier or a particular person.
* When collecting data from scribe sources, you want to use its bucket
data based on its the event's category information.
To support these kinds of features, Flume uses a simple data model,
provides a mechanism for bucketing events, and also provides basic
extraction operations for specifying custom bucketing discriminators.
=== The Data Model of a Flume Event
A Flume event has these six main fields:
* Unix timestamp
* Nanosecond timestamp
* Priority
* Source host
* Body
* Metadata table with an arbitrary number of attribute value pairs.
All events are guaranteed to have all of these elements. However, the
body may have zero length, and the metadata table can be empty.
The Unix timestamp is measured in milliseconds and is Unix time stamp from the
source machine. The nanosecond timestamp is machine specific nanosecond
counter also from the source machine. It is safe to assume that the nanotime
from a machine is monotonically increasing -- i.e. if event A has a larger
nanotime than event B from the same machine, event A was initially received
before event B.
Currently the priority of a message can have one of 6 values: TRACE, DEBUG,
INFO, WARN, ERROR, or FATAL. These values are often provided by logging
systems such as syslog or log4j.
The source host is the name of the machine or the IP (whatever hostname call
returns).
The body is the raw log entry body. The default is to truncate the body to a
maximum of 32KB per event. This is a configurable value and can be changed by
modifying the 'flume.event.max.size.bytes' property.
Finally there is the metadata table which is a map from a string attribute
name to an arbitrary array of bytes. This allows for custom bucketing
attributes and will be described in more depth in the Advanced Usage section
of this guide.
=== Output Bucketing
You can control the output of events to particular directories or files based
on the values of an event's fields. To enable this, you provide an escaping
mechanism that outputs data to a particular path.
For example, here is an output spec:
----
collectorSink("hdfs://namenode/flume/webdata/%H00/", "%{host}-")
----
// TODO describe the actual tags values when they are solidified
The first argument is the directory where data is to be written. The second
is a filename prefix where events are written. Suppose you get an event from a
machine called server1 generated at time 18:58. The events would get written
to HDFS with namenode namenode, in a directory called /flume/webdata/1800/,
with files named server1-xxx where xxx is some extra data for unique file
names.
What happened here? Flume replaced the '%H' with a string that represent the
hour of the timestamp found in the event's data. Likewise, the '%o' was
replace with the hostname field from the event.
What happens if the server1's message had been delayed and the message wasn't
sent downstream until 19:05? Since the value of the timestamp on the event
was during the 18:00 hour, the event would be written into that directory.
.Event data escape sequences
[horizontal] %\{host\} :: host
%\{nanos\} :: nanos
%\{timestamp\} :: timestamp
%\{priority\} :: priority string
%\{body\} :: body
%% :: a '%' character.
%t :: Unix time in millis
Because bucketing by date is a frequently-requested feature, there are escape
sequences for finer control of date values that allow you to bucket data based
on date.
Here is another output spec:
----
collectorSink("hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/", "web-")
----
This would create directories for each day, each with a subdirectory for each
hour with filenames prefixed "web-".
.Fine grained escape sequences date and times
[horizontal]
%a :: locale's short weekday name (Mon, Tue, ...)
%A :: locale's full weekday name (Monday, Tuesday, ...)
%b :: locale's short month name (Jan, Feb,...)
%B :: locale's long month name (January, February,...)
%c :: locale's date and time (Thu Mar 3 23:05:25 2005)
%d :: day of month (01)
%D :: date; same as %m/%d/%y
%H :: hour (00..23)
%I :: hour (01..12)
%j :: day of year (001..366)
%k :: hour ( 0..23)
%l :: hour ( 1..12)
%m :: month (01..12)
%M :: minute (00..59)
%P :: locale's equivalent of am or pm
%s :: seconds since 1970-01-01 00:00:00 UTC
%S :: second (00..60)
%y :: last two digits of year (00..99)
%Y :: year (2010)
%z :: +hhmm numeric timezone (for example, -0400)
=== Output Format
Now that you have control of where files go, this section describes
how you can control the output format of data. There are two ways to
this. The first is to set the default value in flume-site.xml and the other is by specifying output format in particular sinks.
==== Configuring the default output format.
You can set the default output format by etting the
'flume.collector.output.format' property set in the +flume-site.xml+
file. The output formats are:
.Output formats
[horizontal]
avro :: Avro Native file format. Default currently is uncompressed.
avrodata :: Binary encoded data written in the avro binary format.
avrojson :: JSON encoded data generated by avro.
default :: a debugging format.
json :: JSON encoded data.
log4j :: a log4j pattern similar to that used by CDH output pattern.
raw :: Event body only. This is most similar to copying a file but
does not preserve any uniqifying metadata like host/timestamp/nanos.
syslog :: a syslog like text output format.
seqfile :: the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values.
Here is an example of a property added to the +flume-site.xml+ file:
----
flume.collector.output.format
avrojson
This is the output format for the data written to the
collector. There are several formats available:
avro - Avro Native file format. Default currently is uncompressed.
avrodata - this outputs data as an avro binary encoded data
avrojson - this outputs data as json encoded by avro
debug - this is a format for debugging
json - this outputs data as json
log4j - outputs events in a pattern similar to Hadoop's log4j pattern
raw - Event body only. This is most similar to copying a file but
does not preserve any uniqifying metadata like host/timestamp/nanos.
seqfile - this is the hadoop sequence file format with
WritableEventKeys and WritableEvent objects.
syslog - outputs events in a syslog-like format
----
==== Setting output format in specific sinks.
Some sinks have an optional output format argument. These include the
+console+, +text+, +customdfs+/+formatDfs+,
+escapedCustomDfs+/+escapedFormatDfs+, and +collectorSink+.
In these cases, we will be focusing on the optional format argument.
----
collectorSink( "dfsdir","prefix"[, rollmillis[, format]])
text("file"[,format])
formatDfs("hdfs://nn/file" [, format])
escapedFormatDfs("hdfs://nn/file" [, format])
----
This mechanism is more flexible because it allows for output formats
with arguments. Currently, the +seqfile+ output format supports
arugments for specifying an sequence file internal compression codec.
Codecs available for use are the same as those available to hadoop --
generally these include +gzip+, +bzip2+, and can include others that
have been plugged in such +lzo+ or +snappy+.
The compression codec is specified by adding an argument to output
format. For example, the +seqfile+ output format accepts a
compression codec argument. So, to specify an output format that is a
sequence file using the bzip2 codec, the output format would be
+seqfile("bzip2")+. To write bzip2 compressed sequence files, using a
formatDfs sink, we would specify +formatDfs("hdfs://nn/dir/file",
seqfile("bzip2"))+
NOTE: The old syntax requires specifying a Flume String (ex:
+console("avrojson")+, +console("seqfile")+ has been deprecated but
will be supported for a few more versions. The preferred arugment is
now a "function". (ex: +console(avrojson)+,
+console(seqfile("bzip2"))+).
=== Small Files Compared to High Latency
For all versions Hadoop's file system that are earlier than 0.20.x,
HDFS has write-once read-many semantics. Thus, the only way to
reliably flush an HDFS file is to close the file. Moreover, once a
file is closed, no new data can be appended to the file. This
presents a tension between getting data written quickly to HDFS and
potentially having many small files (which is a potential scalability
bottleneck of HDFS).
On one side, to minimize the load and data stored throughput the system,
ideally one would flush data to HDFS as soon as it arrives. Flushing
frequently is in conflict with efficiently storing data to HDFS because this
could result in many small files, which eventually will stress an HDFS
namenode. A compromise is to pick a reasonable trigger that has a collector
close "reasonably-sized" files (ideally larger than a single HDFS block, 64MB
by default).
When Flume is deployed at a scale where data collection volumes are small, it
may take a long time to reach the ideal minimum file size (a block size,
typically 64MB). For example, if a single web server produces 10k of logs a
second (approx. 100 hit logs/s at 100B per log), it will take about 2 hours
(6400 seconds) before an ideal file size can reached.
In these situations, lean towards having more small files. Small files
cause a few problems downstream. These include potential scaling limitations
of Hadoop's HDFS, and performance penalties when using MapReduce's default
input processing mechanisms within Hadoop.
The following sections describe two mechanisms to mitigate these
potential problems:
* Rolling up many small data files into larger batches
* Using a CombinedFileInputFormat
This particular problem becomes less of an issue when the scale of logging
goes up. If a hundred machines were generating the same amount of logs, you
would reach reasonable files sizes every 64 seconds.
Future versions of Hadoop will mitigate this problem by providing a flush/sync
operation for currently open HDFS files (patch is already slated for
Hadoop HDFS 0.21.x).
== Compression for files written to HDFS.
Flume supports basic compression for all log files that are written to
HDFS. Compressed files are automatically suffixed with an extension
and follow the same naming format + directory structure as regular log
files.
If GzipCodec is selected, ".gz" is appended to the file name, if
BZip2Codec is selected, ".bz2" is appended.
NOTE: SequenceFiles (+seqfile+) and Avro Data Files (+avrodata+) support
internal compression, so you should use that instead, and not
specify +flume.collector.dfs.compress.codec+.
----
flume.collector.dfs.compress.codec
None
Writes formatted data compressed in specified codec to
dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec,
or any other Codec Hadoop is aware of
----
///////////////
START OF COMMENTED OUT SECTION
==== Rolling up Date into Larger Batches
TODO (jon) Henry writes up this section.
=== Integrating Flume Data Directly with Map Reduce
MapReduce by default assigns a map job to each individual file specified as
input. In the situation with many small files, one can use the
CombineFileInputFormat as the input format class of a MapReduce job.
More details can be found here:
http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/lib/
CombineFileInputFormat.html
Here's an example of code snippet in a java MapReduce jobs submission.
----
...
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
...
JobConf job = new JobConf();
job.setInputFormat(CombineFileInputFormat.class);
----
// TODO (jon) this is not done yet.
=== Integrating Flume Data with Map Reduce Streaming
// does the streaming command allow for user specified inputformats? ----
bin/hadoop jar build/hadoop-streaming.jar --inputformat org.apache.hadoop.mapred.lib.CombineFileInputFormat ...
----
=== Integrating Using Hive with Flume Data (experimental)
Use avrojson output format, use Hive Avro serde (in Hive 0.x.x).
Use other format and make regexes to parse data.
END OF COMMENTED OUT SECTION
//////////////