1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.mapred.OutputCollector;
29 import org.apache.hadoop.mapred.Reporter;
30 import org.apache.log4j.Logger;
31
32 public class Torque extends AbstractProcessor {
33
34 static Logger log = Logger.getLogger(Torque.class);
35 private SimpleDateFormat sdf = null;
36
37 public Torque() {
38
39 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
40 }
41
42 @Override
43 protected void parse(String recordEntry,
44 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
45 throws Throwable {
46 try {
47 String dStr = recordEntry.substring(0, 23);
48 int start = 24;
49 int idx = recordEntry.indexOf(' ', start);
50 start = idx + 1;
51 idx = recordEntry.indexOf(' ', start);
52 String body = recordEntry.substring(idx + 1);
53 body = body.replaceAll("\n", "");
54 Date d = sdf.parse(dStr);
55 String[] kvpairs = body.split(", ");
56
57 ChukwaRecord record = new ChukwaRecord();
58 String kvpair = null;
59 String[] halves = null;
60 boolean containRecord = false;
61 for (int i = 0; i < kvpairs.length; ++i) {
62 kvpair = kvpairs[i];
63 if (kvpair.indexOf("=") >= 0) {
64 halves = kvpair.split("=");
65 record.add(halves[0], halves[1]);
66 containRecord = true;
67 }
68 }
69 if (record.containsField("Machine")) {
70 buildGenericRecord(record, null, d.getTime(), "HodMachine");
71 } else {
72 buildGenericRecord(record, null, d.getTime(), "HodJob");
73 }
74 if (containRecord) {
75 output.collect(key, record);
76 }
77 } catch (ParseException e) {
78 e.printStackTrace();
79 log.warn("Wrong format in Torque [" + recordEntry + "]", e);
80 throw e;
81 } catch (IOException e) {
82 e.printStackTrace();
83 log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
84 throw e;
85 }
86
87 }
88
89 public String getDataType() {
90 return Torque.class.getName();
91 }
92
93 }