//////////////////// Licensed to Cloudera, Inc. under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. Cloudera, Inc. licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. //////////////////// == Using Data Collected by Flume The first goal of Flume is to collect data and reliably write it to HDFS. Once data arrives, one wants the ability to control where and in what format data is stored. Flume provides basic output control mechanisms via the properties configuration and in the dataflow language. This gives the user the ability to control the output format and output bucketing of incoming data, and simplifies integration with other HDFS data consumers such as Hive and HBase. Here are some example use cases: * When monitoring a web server, you want to bucket logs based on time, the page hit, and the browser being used. * When tracking particular data nodes, you want to bucket logs based on time and the data node name. * When tracking a feed of JIRA tickets from the Apache feed, you want to group based on the project identifier or a particular person. * When collecting data from scribe sources, you want to use its bucket data based on its the event's category information. To support these kinds of features, Flume uses a simple data model, provides a mechanism for bucketing events, and also provides basic extraction operations for specifying custom bucketing discriminators. === The Data Model of a Flume Event A Flume event has these six main fields: * Unix timestamp * Nanosecond timestamp * Priority * Source host * Body * Metadata table with an arbitrary number of attribute value pairs. All events are guaranteed to have all of these elements. However, the body may have zero length, and the metadata table can be empty. The Unix timestamp is measured in milliseconds and is Unix time stamp from the source machine. The nanosecond timestamp is machine specific nanosecond counter also from the source machine. It is safe to assume that the nanotime from a machine is monotonically increasing -- i.e. if event A has a larger nanotime than event B from the same machine, event A was initially received before event B. Currently the priority of a message can have one of 6 values: TRACE, DEBUG, INFO, WARN, ERROR, or FATAL. These values are often provided by logging systems such as syslog or log4j. The source host is the name of the machine or the IP (whatever hostname call returns). The body is the raw log entry body. The default is to truncate the body to a maximum of 32KB per event. This is a configurable value and can be changed by modifying the 'flume.event.max.size.bytes' property. Finally there is the metadata table which is a map from a string attribute name to an arbitrary array of bytes. This allows for custom bucketing attributes and will be described in more depth in the Advanced Usage section of this guide. === Output Bucketing You can control the output of events to particular directories or files based on the values of an event's fields. To enable this, you provide an escaping mechanism that outputs data to a particular path. For example, here is an output spec: ---- collectorSink("hdfs://namenode/flume/webdata/%H00/", "%{host}-") ---- // TODO describe the actual tags values when they are solidified The first argument is the directory where data is to be written. The second is a filename prefix where events are written. Suppose you get an event from a machine called server1 generated at time 18:58. The events would get written to HDFS with namenode namenode, in a directory called /flume/webdata/1800/, with files named server1-xxx where xxx is some extra data for unique file names. What happened here? Flume replaced the '%H' with a string that represent the hour of the timestamp found in the event's data. Likewise, the '%o' was replace with the hostname field from the event. What happens if the server1's message had been delayed and the message wasn't sent downstream until 19:05? Since the value of the timestamp on the event was during the 18:00 hour, the event would be written into that directory. .Event data escape sequences [horizontal] %\{host\} :: host %\{nanos\} :: nanos %\{timestamp\} :: timestamp %\{priority\} :: priority string %\{body\} :: body %% :: a '%' character. %t :: Unix time in millis Because bucketing by date is a frequently-requested feature, there are escape sequences for finer control of date values that allow you to bucket data based on date. Here is another output spec: ---- collectorSink("hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/", "web-") ---- This would create directories for each day, each with a subdirectory for each hour with filenames prefixed "web-". .Fine grained escape sequences date and times [horizontal] %a :: locale's short weekday name (Mon, Tue, ...) %A :: locale's full weekday name (Monday, Tuesday, ...) %b :: locale's short month name (Jan, Feb,...) %B :: locale's long month name (January, February,...) %c :: locale's date and time (Thu Mar 3 23:05:25 2005) %d :: day of month (01) %D :: date; same as %m/%d/%y %H :: hour (00..23) %I :: hour (01..12) %j :: day of year (001..366) %k :: hour ( 0..23) %l :: hour ( 1..12) %m :: month (01..12) %M :: minute (00..59) %P :: locale's equivalent of am or pm %s :: seconds since 1970-01-01 00:00:00 UTC %S :: second (00..60) %y :: last two digits of year (00..99) %Y :: year (2010) %z :: +hhmm numeric timezone (for example, -0400) === Output Format Now that you have control of where files go, this section describes how you can control the output format of data. There are two ways to this. The first is to set the default value in flume-site.xml and the other is by specifying output format in particular sinks. ==== Configuring the default output format. You can set the default output format by etting the 'flume.collector.output.format' property set in the +flume-site.xml+ file. The output formats are: .Output formats [horizontal] avro :: Avro Native file format. Default currently is uncompressed. avrodata :: Binary encoded data written in the avro binary format. avrojson :: JSON encoded data generated by avro. default :: a debugging format. json :: JSON encoded data. log4j :: a log4j pattern similar to that used by CDH output pattern. raw :: Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos. syslog :: a syslog like text output format. seqfile :: the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values. Here is an example of a property added to the +flume-site.xml+ file: ---- flume.collector.output.format avrojson This is the output format for the data written to the collector. There are several formats available: avro - Avro Native file format. Default currently is uncompressed. avrodata - this outputs data as an avro binary encoded data avrojson - this outputs data as json encoded by avro debug - this is a format for debugging json - this outputs data as json log4j - outputs events in a pattern similar to Hadoop's log4j pattern raw - Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos. seqfile - this is the hadoop sequence file format with WritableEventKeys and WritableEvent objects. syslog - outputs events in a syslog-like format ---- ==== Setting output format in specific sinks. Some sinks have an optional output format argument. These include the +console+, +text+, +customdfs+/+formatDfs+, +escapedCustomDfs+/+escapedFormatDfs+, and +collectorSink+. In these cases, we will be focusing on the optional format argument. ---- collectorSink( "dfsdir","prefix"[, rollmillis[, format]]) text("file"[,format]) formatDfs("hdfs://nn/file" [, format]) escapedFormatDfs("hdfs://nn/file" [, format]) ---- This mechanism is more flexible because it allows for output formats with arguments. Currently, the +seqfile+ output format supports arugments for specifying an sequence file internal compression codec. Codecs available for use are the same as those available to hadoop -- generally these include +gzip+, +bzip2+, and can include others that have been plugged in such +lzo+ or +snappy+. The compression codec is specified by adding an argument to output format. For example, the +seqfile+ output format accepts a compression codec argument. So, to specify an output format that is a sequence file using the bzip2 codec, the output format would be +seqfile("bzip2")+. To write bzip2 compressed sequence files, using a formatDfs sink, we would specify +formatDfs("hdfs://nn/dir/file", seqfile("bzip2"))+ NOTE: The old syntax requires specifying a Flume String (ex: +console("avrojson")+, +console("seqfile")+ has been deprecated but will be supported for a few more versions. The preferred arugment is now a "function". (ex: +console(avrojson)+, +console(seqfile("bzip2"))+). === Small Files Compared to High Latency For all versions Hadoop's file system that are earlier than 0.20.x, HDFS has write-once read-many semantics. Thus, the only way to reliably flush an HDFS file is to close the file. Moreover, once a file is closed, no new data can be appended to the file. This presents a tension between getting data written quickly to HDFS and potentially having many small files (which is a potential scalability bottleneck of HDFS). On one side, to minimize the load and data stored throughput the system, ideally one would flush data to HDFS as soon as it arrives. Flushing frequently is in conflict with efficiently storing data to HDFS because this could result in many small files, which eventually will stress an HDFS namenode. A compromise is to pick a reasonable trigger that has a collector close "reasonably-sized" files (ideally larger than a single HDFS block, 64MB by default). When Flume is deployed at a scale where data collection volumes are small, it may take a long time to reach the ideal minimum file size (a block size, typically 64MB). For example, if a single web server produces 10k of logs a second (approx. 100 hit logs/s at 100B per log), it will take about 2 hours (6400 seconds) before an ideal file size can reached. In these situations, lean towards having more small files. Small files cause a few problems downstream. These include potential scaling limitations of Hadoop's HDFS, and performance penalties when using MapReduce's default input processing mechanisms within Hadoop. The following sections describe two mechanisms to mitigate these potential problems: * Rolling up many small data files into larger batches * Using a CombinedFileInputFormat This particular problem becomes less of an issue when the scale of logging goes up. If a hundred machines were generating the same amount of logs, you would reach reasonable files sizes every 64 seconds. Future versions of Hadoop will mitigate this problem by providing a flush/sync operation for currently open HDFS files (patch is already slated for Hadoop HDFS 0.21.x). == Compression for files written to HDFS. Flume supports basic compression for all log files that are written to HDFS. Compressed files are automatically suffixed with an extension and follow the same naming format + directory structure as regular log files. If GzipCodec is selected, ".gz" is appended to the file name, if BZip2Codec is selected, ".bz2" is appended. NOTE: SequenceFiles (+seqfile+) and Avro Data Files (+avrodata+) support internal compression, so you should use that instead, and not specify +flume.collector.dfs.compress.codec+. ---- flume.collector.dfs.compress.codec None Writes formatted data compressed in specified codec to dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec, or any other Codec Hadoop is aware of ---- /////////////// START OF COMMENTED OUT SECTION ==== Rolling up Date into Larger Batches TODO (jon) Henry writes up this section. === Integrating Flume Data Directly with Map Reduce MapReduce by default assigns a map job to each individual file specified as input. In the situation with many small files, one can use the CombineFileInputFormat as the input format class of a MapReduce job. More details can be found here: http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/lib/ CombineFileInputFormat.html Here's an example of code snippet in a java MapReduce jobs submission. ---- ... import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; ... JobConf job = new JobConf(); job.setInputFormat(CombineFileInputFormat.class); ---- // TODO (jon) this is not done yet. === Integrating Flume Data with Map Reduce Streaming // does the streaming command allow for user specified inputformats? ---- bin/hadoop jar build/hadoop-streaming.jar --inputformat org.apache.hadoop.mapred.lib.CombineFileInputFormat ... ---- === Integrating Using Hive with Flume Data (experimental) Use avrojson output format, use Hive Avro serde (in Hive 0.x.x). Use other format and make regexes to parse data. END OF COMMENTED OUT SECTION //////////////