1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20
21
22 import java.io.IOException;
23 import java.util.Iterator;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26 import org.apache.hadoop.chukwa.extraction.engine.Record;
27 import org.apache.hadoop.mapred.OutputCollector;
28 import org.apache.hadoop.mapred.Reporter;
29 import org.apache.log4j.Logger;
30
31 public class JobLogHistoryReduceProcessor implements ReduceProcessor {
32 static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
33
34 @Override
35 public String getDataType() {
36 return this.getClass().getName();
37 }
38
39 @Override
40 public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
41 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
42 try {
43 String action = key.getKey();
44 int count = 0;
45
46 ChukwaRecord record = null;
47 while (values.hasNext()) {
48 record = values.next();
49 if (record.containsField("START_TIME")) {
50 count++;
51 } else {
52 count--;
53 }
54 }
55 ChukwaRecordKey newKey = new ChukwaRecordKey();
56 newKey.setKey("" + record.getTime());
57 newKey.setReduceType("MSSRGraph");
58 ChukwaRecord newRecord = new ChukwaRecord();
59 newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
60 newRecord.setTime(record.getTime());
61 newRecord.add("count", "" + count);
62 newRecord.add("JOBID", record.getValue("JOBID"));
63 if (action.indexOf("JobLogHist/Map/") >= 0) {
64 newRecord.add("type", "MAP");
65 } else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0) {
66 newRecord.add("type", "SHUFFLE");
67 } else if (action.indexOf("JobLogHist/SORT/") >= 0) {
68 newRecord.add("type", "SORT");
69 } else if (action.indexOf("JobLogHist/REDUCE/") >= 0) {
70 newRecord.add("type", "REDUCE");
71 }
72
73 output.collect(newKey, newRecord);
74 } catch (IOException e) {
75 log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
76 + key + "]", e);
77 e.printStackTrace();
78 }
79
80 }
81
82 }