1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26
27 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.mapred.OutputCollector;
32 import org.apache.hadoop.mapred.Reporter;
33 import org.apache.log4j.Logger;
34
35 @Tables(annotations={
36 @Table(name="HadoopLog",columnFamily="NameNode"),
37 @Table(name="HadoopLog",columnFamily="DataNode"),
38 @Table(name="HadoopLog",columnFamily="Audit")
39 })
40 public class HadoopLogProcessor extends AbstractProcessor {
41 static Logger log = Logger.getLogger(HadoopLogProcessor.class);
42
43 private static final String recordType = "HadoopLog";
44 private static final String nameNodeType = "NameNode";
45 private static final String dataNodeType = "DataNode";
46 private static final String auditType = "Audit";
47 private SimpleDateFormat sdf = null;
48
49 public HadoopLogProcessor() {
50
51 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
52 }
53
54 @Override
55 public void parse(String recordEntry,
56 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57 throws Throwable {
58 try {
59 String dStr = recordEntry.substring(0, 23);
60 Date d = sdf.parse(dStr);
61 ChukwaRecord record = new ChukwaRecord();
62
63 if (this.chunk.getStreamName().indexOf("datanode") > 0) {
64 buildGenericRecord(record, recordEntry, d.getTime(), dataNodeType);
65 } else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
66 buildGenericRecord(record, recordEntry, d.getTime(), nameNodeType);
67 } else if (this.chunk.getStreamName().indexOf("audit") > 0) {
68 buildGenericRecord(record, recordEntry, d.getTime(), auditType);
69 } else {
70 buildGenericRecord(record, recordEntry, d.getTime(), recordType);
71 }
72
73 output.collect(key, record);
74 } catch (ParseException e) {
75 log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
76 + "]", e);
77 e.printStackTrace();
78 throw e;
79 } catch (IOException e) {
80 log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
81 + "]", e);
82 e.printStackTrace();
83 throw e;
84 }
85 }
86
87 public String getDataType() {
88 return HadoopLogProcessor.recordType;
89 }
90
91 }