1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23
24 import org.apache.hadoop.chukwa.extraction.engine.Record;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27 import org.apache.hadoop.mapred.OutputCollector;
28 import org.apache.hadoop.mapred.Reporter;
29 import org.apache.log4j.Logger;
30
31 public class ClientTrace implements ReduceProcessor {
32
33 static final Logger log = Logger.getLogger(ClientTrace.class);
34
35 @Override
36 public String getDataType() {
37 return this.getClass().getName();
38 }
39
40 @Override
41 public void process(ChukwaRecordKey key,
42 Iterator<ChukwaRecord> values,
43 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
44 Reporter reporter) {
45 try {
46 long bytes = 0L;
47 ChukwaRecord rec = null;
48 while (values.hasNext()) {
49
50 rec = values.next();
51 bytes += Long.valueOf(rec.getValue("bytes"));
52
53
54
55 ChukwaRecordKey detailed_key = new ChukwaRecordKey();
56 String [] k = key.getKey().split("/");
57 String full_timestamp = null;
58 full_timestamp = rec.getValue("actual_time");
59 detailed_key.setReduceType("ClientTraceDetailed");
60 detailed_key.setKey(k[0]+"/"+k[1]+"_"+k[2]+"/"+full_timestamp);
61 output.collect(detailed_key, rec);
62 }
63 if (null == rec) {
64 return;
65 }
66 ChukwaRecord emit = new ChukwaRecord();
67 emit.add(Record.tagsField, rec.getValue(Record.tagsField));
68 emit.add(Record.sourceField, "undefined");
69 emit.add(Record.applicationField, rec.getValue(Record.applicationField));
70
71 String[] k = key.getKey().split("/");
72 emit.add(k[1] + "_" + k[2], String.valueOf(bytes));
73 emit.setTime(Long.valueOf(k[3]));
74 output.collect(key, emit);
75
76 } catch (IOException e) {
77 log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
78 }
79 }
80 }