View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.inputtools;
20  
21  
22  import java.io.IOException;
23  import java.util.regex.*;
24  import org.apache.hadoop.chukwa.*;
25  import org.apache.hadoop.mapred.*;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.io.*;
28  import org.apache.log4j.Logger;
29  
30  /***
31   * An InputFormat for processing logfiles in Chukwa. Designed to be a nearly
32   * drop-in replacement for the Hadoop default TextInputFormat so that code can
33   * be ported to use Chukwa with minimal modification.
34   * 
35   * Has an optional configuration option, chukwa.inputfilter.datatype which can
36   * be used to filter the input by datatype. If need exists, this mechanism could
37   * be extended to also filter by other fields.
38   * 
39   */
40  @SuppressWarnings("deprecation")
41  public class ChukwaInputFormat extends
42      SequenceFileInputFormat<LongWritable, Text> {
43  
44    public static class ChukwaRecordReader implements
45        RecordReader<LongWritable, Text> {
46  
47      static Logger LOG = Logger.getLogger(ChukwaInputFormat.class);
48  
49      private SequenceFileRecordReader<ChukwaArchiveKey, Chunk> sfrr;
50      private long lineInFile = 0;
51      private Chunk curChunk = null;
52      private int lineInChunk; // outside of next, it's the array offset of next
53                               // line to be returned
54      private int[] lineOffsets = null;
55      private int byteOffsetOfLastLine = 0;
56      Pattern dtPattern;
57  
58      public ChukwaRecordReader(Configuration conf, FileSplit split)
59          throws IOException {
60        sfrr = new SequenceFileRecordReader<ChukwaArchiveKey, Chunk>(conf, split);
61        dtPattern = Pattern
62            .compile(conf.get("chukwa.inputfilter.datatype", ".*"));
63      }
64  
65      @Override
66      public void close() throws IOException {
67        sfrr.close();
68      }
69  
70      @Override
71      public LongWritable createKey() {
72        return new LongWritable();
73      }
74  
75      @Override
76      public Text createValue() {
77        return new Text();
78      }
79  
80      @Override
81      public long getPos() throws IOException {
82        return sfrr.getPos();
83      }
84  
85      @Override
86      public float getProgress() throws IOException {
87        return sfrr.getProgress();
88      }
89  
90      private boolean passesFilters(Chunk c) {
91        return dtPattern.matcher(c.getDataType()).matches();
92      }
93  
94      @Override
95      public boolean next(LongWritable key, Text value) throws IOException {
96        if (curChunk == null) {
97          ChukwaArchiveKey k = new ChukwaArchiveKey();
98          curChunk = ChunkImpl.getBlankChunk();
99          boolean unfilteredChunk = false;
100         while (!unfilteredChunk) {
101           boolean readOK = sfrr.next(k, curChunk);
102           if (!readOK) {
103             curChunk = null;
104             return false;
105           }
106           unfilteredChunk = passesFilters(curChunk);
107         }
108         lineOffsets = curChunk.getRecordOffsets();
109         lineInChunk = 0;
110         byteOffsetOfLastLine = 0;
111       } // end curChunk == null
112       value.set(curChunk.getData(), byteOffsetOfLastLine,
113           lineOffsets[lineInChunk] - byteOffsetOfLastLine);
114       if (lineInChunk >= lineOffsets.length - 1) { // end of chunk
115         curChunk = null;
116       } else
117         byteOffsetOfLastLine = lineOffsets[lineInChunk++] + 1;
118 
119       key.set(lineInFile);
120       lineInFile++;
121       return true;
122     }
123   } // end ChukwaRecordReader
124 
125   @Override
126   public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
127       JobConf job, Reporter reporter) throws IOException {
128     reporter.setStatus(split.toString());
129     LOG.info("returning a new chukwa record reader");
130     return new ChukwaRecordReader(job, (FileSplit) split);
131   }
132 
133 }