1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.inputtools;
20
21
22 import java.io.IOException;
23 import java.util.regex.*;
24 import org.apache.hadoop.chukwa.*;
25 import org.apache.hadoop.mapred.*;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.io.*;
28 import org.apache.log4j.Logger;
29
30
31
32
33
34
35
36
37
38
39
40 @SuppressWarnings("deprecation")
41 public class ChukwaInputFormat extends
42 SequenceFileInputFormat<LongWritable, Text> {
43
44 public static class ChukwaRecordReader implements
45 RecordReader<LongWritable, Text> {
46
47 static Logger LOG = Logger.getLogger(ChukwaInputFormat.class);
48
49 private SequenceFileRecordReader<ChukwaArchiveKey, Chunk> sfrr;
50 private long lineInFile = 0;
51 private Chunk curChunk = null;
52 private int lineInChunk;
53
54 private int[] lineOffsets = null;
55 private int byteOffsetOfLastLine = 0;
56 Pattern dtPattern;
57
58 public ChukwaRecordReader(Configuration conf, FileSplit split)
59 throws IOException {
60 sfrr = new SequenceFileRecordReader<ChukwaArchiveKey, Chunk>(conf, split);
61 dtPattern = Pattern
62 .compile(conf.get("chukwa.inputfilter.datatype", ".*"));
63 }
64
65 @Override
66 public void close() throws IOException {
67 sfrr.close();
68 }
69
70 @Override
71 public LongWritable createKey() {
72 return new LongWritable();
73 }
74
75 @Override
76 public Text createValue() {
77 return new Text();
78 }
79
80 @Override
81 public long getPos() throws IOException {
82 return sfrr.getPos();
83 }
84
85 @Override
86 public float getProgress() throws IOException {
87 return sfrr.getProgress();
88 }
89
90 private boolean passesFilters(Chunk c) {
91 return dtPattern.matcher(c.getDataType()).matches();
92 }
93
94 @Override
95 public boolean next(LongWritable key, Text value) throws IOException {
96 if (curChunk == null) {
97 ChukwaArchiveKey k = new ChukwaArchiveKey();
98 curChunk = ChunkImpl.getBlankChunk();
99 boolean unfilteredChunk = false;
100 while (!unfilteredChunk) {
101 boolean readOK = sfrr.next(k, curChunk);
102 if (!readOK) {
103 curChunk = null;
104 return false;
105 }
106 unfilteredChunk = passesFilters(curChunk);
107 }
108 lineOffsets = curChunk.getRecordOffsets();
109 lineInChunk = 0;
110 byteOffsetOfLastLine = 0;
111 }
112 value.set(curChunk.getData(), byteOffsetOfLastLine,
113 lineOffsets[lineInChunk] - byteOffsetOfLastLine);
114 if (lineInChunk >= lineOffsets.length - 1) {
115 curChunk = null;
116 } else
117 byteOffsetOfLastLine = lineOffsets[lineInChunk++] + 1;
118
119 key.set(lineInFile);
120 lineInFile++;
121 return true;
122 }
123 }
124
125 @Override
126 public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
127 JobConf job, Reporter reporter) throws IOException {
128 reporter.setStatus(split.toString());
129 LOG.info("returning a new chukwa record reader");
130 return new ChukwaRecordReader(job, (FileSplit) split);
131 }
132
133 }