1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.List;
29 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
30 import org.apache.hadoop.chukwa.ChunkImpl;
31 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
32 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
36 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
37 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
38 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
39 import org.apache.hadoop.chukwa.util.ExceptionUtil;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.conf.Configured;
42 import org.apache.hadoop.filecache.DistributedCache;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.mapred.FileInputFormat;
47 import org.apache.hadoop.mapred.FileOutputFormat;
48 import org.apache.hadoop.mapred.JobClient;
49 import org.apache.hadoop.mapred.JobConf;
50 import org.apache.hadoop.mapred.JobPriority;
51 import org.apache.hadoop.mapred.MapReduceBase;
52 import org.apache.hadoop.mapred.Mapper;
53 import org.apache.hadoop.mapred.OutputCollector;
54 import org.apache.hadoop.mapred.Reducer;
55 import org.apache.hadoop.mapred.Reporter;
56 import org.apache.hadoop.mapred.SequenceFileInputFormat;
57 import org.apache.hadoop.util.Tool;
58 import org.apache.hadoop.util.ToolRunner;
59 import org.apache.log4j.Logger;
60
61 public class Demux extends Configured implements Tool {
62 static Logger log = Logger.getLogger(Demux.class);
63 static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
64 public static JobConf jobConf = null;
65
66 public static class MapClass extends MapReduceBase implements
67 Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
68
69 @Override
70 public void configure(JobConf jobConf) {
71 super.configure(jobConf);
72 Demux.jobConf= jobConf;
73 }
74
75 public void map(ChukwaArchiveKey key, ChunkImpl chunk,
76 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
77 throws IOException {
78
79 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
80 "DemuxMapOutput", output, reporter);
81 try {
82 long duration = System.currentTimeMillis();
83 if (log.isDebugEnabled()) {
84 log.debug("Entry: [" + chunk.getData() + "] EventType: ["
85 + chunk.getDataType() + "]");
86 }
87
88 String defaultProcessor = Demux.jobConf.get(
89 "chukwa.demux.mapper.default.processor",
90 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
91
92 String processorClass = Demux.jobConf.get(chunk.getDataType(),
93 defaultProcessor);
94
95 if (!processorClass.equalsIgnoreCase("Drop")) {
96 reporter.incrCounter("DemuxMapInput", "total chunks", 1);
97 reporter.incrCounter("DemuxMapInput",
98 chunk.getDataType() + " chunks", 1);
99
100 MapProcessor processor = MapProcessorFactory
101 .getProcessor(processorClass);
102 processor.process(key, chunk, chukwaOutputCollector, reporter);
103 if (log.isDebugEnabled()) {
104 duration = System.currentTimeMillis() - duration;
105 log.debug("Demux:Map dataType:" + chunk.getDataType()
106 + " duration:" + duration + " processor:" + processorClass
107 + " recordCount:" + chunk.getRecordOffsets().length);
108 }
109
110 } else {
111 log.info("action:Demux, dataType:" + chunk.getDataType()
112 + " duration:0 processor:Drop recordCount:"
113 + chunk.getRecordOffsets().length);
114 }
115
116 } catch (Exception e) {
117 log.warn("Exception in Demux:MAP", e);
118 e.printStackTrace();
119 }
120 }
121 }
122
123 public static class ReduceClass extends MapReduceBase implements
124 Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
125
126 public void configure(JobConf jobConf) {
127 super.configure(jobConf);
128 Demux.jobConf = jobConf;
129 }
130
131 public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
132 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
133 throws IOException {
134 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
135 "DemuxReduceOutput", output, reporter);
136 try {
137 long duration = System.currentTimeMillis();
138 reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
139 reporter.incrCounter("DemuxReduceInput", key.getReduceType()
140 + " total distinct keys", 1);
141
142 String defaultProcessor = Demux.jobConf.get(
143 "chukwa.demux.reducer.default.processor", null);
144 ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
145 key.getReduceType(), defaultProcessor);
146
147 processor.process(key, values, chukwaOutputCollector, reporter);
148
149 if (log.isDebugEnabled()) {
150 duration = System.currentTimeMillis() - duration;
151 log.debug("Demux:Reduce, dataType:" + key.getReduceType()
152 + " duration:" + duration);
153 }
154
155 } catch (Exception e) {
156 log.warn("Exception in Demux:Reduce", e);
157 e.printStackTrace();
158 }
159 }
160 }
161
162 static int printUsage() {
163 System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
164 ToolRunner.printGenericCommandUsage(System.out);
165 return -1;
166 }
167
168 public static void addParsers(Configuration conf) {
169 String parserPath = conf.get("chukwa.data.dir")+File.separator+"demux";
170 try {
171 FileSystem fs = FileSystem.get(new Configuration());
172 FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
173 if(fstatus!=null) {
174 String hdfsUrlPrefix = conf.get("fs.default.name");
175
176 for(FileStatus parser : fstatus) {
177 Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
178 log.debug("Adding parser JAR path " + jarPath);
179 DistributedCache.addFileToClassPath(jarPath, conf);
180 }
181 }
182 } catch (IOException e) {
183 log.error(ExceptionUtil.getStackTrace(e));
184 }
185 }
186
187 public int run(String[] args) throws Exception {
188 JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
189
190
191 conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
192 conf.setInputFormat(SequenceFileInputFormat.class);
193 conf.setMapperClass(Demux.MapClass.class);
194 conf.setPartitionerClass(ChukwaRecordPartitioner.class);
195 conf.setReducerClass(Demux.ReduceClass.class);
196
197 conf.setOutputKeyClass(ChukwaRecordKey.class);
198 conf.setOutputValueClass(ChukwaRecord.class);
199 conf.setOutputFormat(ChukwaRecordOutputFormat.class);
200 conf.setJobPriority(JobPriority.VERY_HIGH);
201 addParsers(conf);
202
203 List<String> other_args = new ArrayList<String>();
204 for (int i = 0; i < args.length; ++i) {
205 try {
206 if ("-m".equals(args[i])) {
207 conf.setNumMapTasks(Integer.parseInt(args[++i]));
208 } else if ("-r".equals(args[i])) {
209 conf.setNumReduceTasks(Integer.parseInt(args[++i]));
210 } else {
211 other_args.add(args[i]);
212 }
213 } catch (NumberFormatException except) {
214 System.out.println("ERROR: Integer expected instead of " + args[i]);
215 return printUsage();
216 } catch (ArrayIndexOutOfBoundsException except) {
217 System.out.println("ERROR: Required parameter missing from "
218 + args[i - 1]);
219 return printUsage();
220 }
221 }
222
223 if (other_args.size() != 2) {
224 System.out.println("ERROR: Wrong number of parameters: "
225 + other_args.size() + " instead of 2.");
226 return printUsage();
227 }
228
229 FileInputFormat.setInputPaths(conf, other_args.get(0));
230 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
231
232 JobClient.runJob(conf);
233 return 0;
234 }
235
236 public static void main(String[] args) throws Exception {
237 int res = ToolRunner.run(new Configuration(), new Demux(), args);
238 System.exit(res);
239 }
240
241 }