1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
19
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26 import org.apache.hadoop.chukwa.extraction.engine.Record;
27 import org.apache.hadoop.mapred.OutputCollector;
28 import org.apache.hadoop.mapred.Reporter;
29 import org.apache.log4j.Logger;
30
31 public class MRJobReduceProcessor implements ReduceProcessor {
32 static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
33
34 @Override
35 public String getDataType() {
36 return MRJobReduceProcessor.class.getName();
37 }
38
39 @Override
40 public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
41 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
42 try {
43 HashMap<String, String> data = new HashMap<String, String>();
44
45 ChukwaRecord record = null;
46 String[] fields = null;
47 while (values.hasNext()) {
48 record = values.next();
49 fields = record.getFields();
50 for (String field : fields) {
51 data.put(field, record.getValue(field));
52 }
53 }
54
55
56 long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
57
58
59
60
61
62 String jobConf = data.get("JOBCONF");
63 int idx = jobConf.indexOf("mapredsystem/");
64 idx += 13;
65 int idx2 = jobConf.indexOf(".", idx);
66 data.put("HodId", jobConf.substring(idx, idx2));
67
68 ChukwaRecordKey newKey = new ChukwaRecordKey();
69 newKey.setKey("" + initTime);
70 newKey.setReduceType("MRJob");
71
72 ChukwaRecord newRecord = new ChukwaRecord();
73 newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
74 newRecord.setTime(initTime);
75 newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
76 Iterator<String> it = data.keySet().iterator();
77 while (it.hasNext()) {
78 String field = it.next();
79 newRecord.add(field, data.get(field));
80 }
81
82 output.collect(newKey, newRecord);
83 } catch (IOException e) {
84 log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
85 + key + "]", e);
86 e.printStackTrace();
87 }
88
89 }
90
91 }