1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19
20
21 import java.io.IOException;
22 import java.text.ParseException;
23 import java.text.SimpleDateFormat;
24 import java.util.Date;
25 import java.util.Map;
26 import java.util.HashMap;
27 import java.util.regex.Pattern;
28 import java.util.regex.Matcher;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.chukwa.extraction.demux.Demux;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.apache.hadoop.mapred.JobConf;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @Table(name="TsProcessor",columnFamily="log")
64 public class TsProcessor extends AbstractProcessor {
65 static Logger log = Logger.getLogger(TsProcessor.class);
66
67 public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
68
69 private Map<String, Pattern> datePatternMap;
70 private Map<String, SimpleDateFormat> dateFormatMap;
71
72 public TsProcessor() {
73 datePatternMap = new HashMap<String, Pattern>();
74 dateFormatMap = new HashMap<String, SimpleDateFormat>();
75 }
76
77 @Override
78 protected void parse(String recordEntry,
79 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
80 throws Throwable {
81 try {
82 SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
83 Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
84 String dStr = null;
85
86
87 if(datePattern != null) {
88 Matcher m = datePattern.matcher(recordEntry);
89 if (!m.matches() || m.groupCount() < 1) {
90 throw new ParseException("Regex " + datePattern +
91 " couldn't extract date string from record: " + recordEntry, 0);
92 }
93 else {
94 dStr = m.group(1);
95 }
96 }
97 else {
98 dStr = recordEntry;
99 }
100
101 Date d = sdf.parse(dStr);
102 ChukwaRecord record = new ChukwaRecord();
103 this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
104 .getDataType());
105 output.collect(key, record);
106 } catch (ParseException e) {
107 log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
108 + "]", e);
109 e.printStackTrace();
110 throw e;
111 } catch (IOException e) {
112 log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
113 + "]", e);
114 e.printStackTrace();
115 throw e;
116 }
117
118 }
119
120
121
122
123
124
125 private SimpleDateFormat fetchDateFormat(String dataType) {
126 if (dateFormatMap.get(dataType) != null) {
127 return dateFormatMap.get(dataType);
128 }
129
130 JobConf jobConf = Demux.jobConf;
131 String dateFormat = DEFAULT_DATE_FORMAT;
132
133 if (jobConf != null) {
134 dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
135 dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
136 dateFormat);
137 }
138
139 SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
140 dateFormatMap.put(dataType, sdf);
141
142 return sdf;
143 }
144
145
146
147
148
149
150
151 private Pattern fetchDateLocationPattern(String dataType) {
152 if (datePatternMap.containsKey(dataType)) {
153 return datePatternMap.get(dataType);
154 }
155
156 JobConf jobConf = Demux.jobConf;
157 String datePattern = null;
158
159 if (jobConf != null) {
160 datePattern = jobConf.get("TsProcessor.default.time.regex", null);
161 datePattern = jobConf.get("TsProcessor.time.regex." + chunk.getDataType(),
162 datePattern);
163 }
164
165 Pattern pattern = datePattern != null ? Pattern.compile(datePattern) : null;
166 datePatternMap.put(dataType, pattern);
167
168 return pattern;
169 }
170
171 }