1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Map.Entry;
22
23 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
25 import org.apache.hadoop.chukwa.inputtools.jplugin.ChukwaMetrics;
26 import org.apache.hadoop.chukwa.inputtools.jplugin.GenericChukwaMetricsList;
27 import org.apache.hadoop.mapred.OutputCollector;
28 import org.apache.hadoop.mapred.Reporter;
29
30 public class JPluginMapper extends AbstractProcessor {
31 @Override
32 protected void parse(String recordEntry,
33 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
34 Reporter reporter) throws Throwable {
35 LogEntry entry = new LogEntry(recordEntry);
36 String xml = entry.getBody();
37 GenericChukwaMetricsList metricsList = new GenericChukwaMetricsList();
38 metricsList.fromXml(xml);
39 String recType = metricsList.getRecordType();
40 long timestamp = metricsList.getTimestamp();
41 for (ChukwaMetrics metrics : metricsList.getMetricsList()) {
42 key = new ChukwaRecordKey();
43 ChukwaRecord record = new ChukwaRecord();
44 this.buildGenericRecord(record, null, -1l, recType);
45 record.setTime(timestamp);
46 key.setKey(getKey(timestamp, metrics.getKey()));
47 record.add("key", metrics.getKey());
48 for (Entry<String, String> attr : metrics.getAttributes().entrySet()) {
49 record.add(attr.getKey(), attr.getValue());
50 }
51 output.collect(key, record);
52 }
53 }
54
55 private String getKey(long ts, String metricsKey) {
56 long unit = 60 * 60 * 1000;
57 long rounded = (ts / unit) * unit;
58 return rounded + "/" + metricsKey + "/" + ts;
59 }
60 }