View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19  
20  
21  import java.io.IOException;
22  import java.text.ParseException;
23  import java.text.SimpleDateFormat;
24  import java.util.Date;
25  import java.util.Map;
26  import java.util.HashMap;
27  import java.util.regex.Pattern;
28  import java.util.regex.Matcher;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.chukwa.extraction.demux.Demux;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  import org.apache.hadoop.mapred.JobConf;
37  import org.apache.log4j.Logger;
38  
39  /**
40   * TsProcessor is a generic processor that can be configured to find the timestamp
41   * in the text of a record. By default, this class expects that a record
42   * starts with a date in this format: <code>yyyy-MM-dd HH:mm:ss,SSS</code>
43   * <P>
44   * This format can be changed with the following configurations.
45   * <UL>
46   * <LI><code>TsProcessor.default.time.format</code> - Changes the default time
47   * format used by all data types.</LI>
48   * <LI><code>TsProcessor.time.format.[some_data_type]</code> - Overrides the default
49   * format for a specific data type.</LI>
50   * </UL>
51   * If the time string is not at the beginning of the record you can configure a
52   * regular expression to locate the timestamp text with either of the following
53   * configurations. The text found in group 1 of the regular expression match
54   * will be used with the configured date format.
55   * <UL>
56   * <LI><code>TsProcessor.default.time.regex</code> - Changes the default time
57   * location regex of the time text for all data types.</LI>
58   * <LI><code>TsProcessor.time.regex.[some_data_type]</code> - Overrides the
59   * default time location regex for a specific data type.</LI>
60   * </UL>
61   *
62   */
63  @Table(name="TsProcessor",columnFamily="log")
64  public class TsProcessor extends AbstractProcessor {
65    static Logger log = Logger.getLogger(TsProcessor.class);
66  
67    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
68  
69    private Map<String, Pattern> datePatternMap;
70    private Map<String, SimpleDateFormat> dateFormatMap;
71  
72    public TsProcessor() {
73      datePatternMap = new HashMap<String, Pattern>();
74      dateFormatMap = new HashMap<String, SimpleDateFormat>();
75    }
76  
77    @Override
78    protected void parse(String recordEntry,
79        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
80        throws Throwable {
81      try {
82        SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
83        Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
84        String dStr = null;
85  
86        // fetch the part of the record that contains the date.
87        if(datePattern != null) {
88          Matcher m = datePattern.matcher(recordEntry);
89          if (!m.matches() || m.groupCount() < 1) {
90            throw new ParseException("Regex " + datePattern +
91                    " couldn't extract date string from record: " + recordEntry, 0);
92          }
93          else {
94            dStr = m.group(1);
95          }
96        }
97        else {
98          dStr = recordEntry;
99        }
100 
101       Date d = sdf.parse(dStr);
102       ChukwaRecord record = new ChukwaRecord();
103       this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
104           .getDataType());
105       output.collect(key, record);
106     } catch (ParseException e) {
107       log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
108           + "]", e);
109       e.printStackTrace();
110       throw e;
111     } catch (IOException e) {
112       log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
113           + "]", e);
114       e.printStackTrace();
115       throw e;
116     }
117 
118   }
119   
120   /**
121    * For a given dataType, returns the SimpeDateFormat to use.
122    * @param dataType
123    * @return
124    */
125   private SimpleDateFormat fetchDateFormat(String dataType) {
126     if (dateFormatMap.get(dataType) != null) {
127       return dateFormatMap.get(dataType);
128     }
129 
130     JobConf jobConf = Demux.jobConf;
131     String dateFormat = DEFAULT_DATE_FORMAT;
132 
133     if (jobConf != null) {
134       dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
135       dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
136                                dateFormat);
137     }
138 
139     SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
140     dateFormatMap.put(dataType, sdf);
141 
142     return sdf;
143   }
144 
145   /**
146    * For a given dataType, returns a Pattern that will produce the date portion
147    * of the string.
148    * @param dataType
149    * @return
150    */
151   private Pattern fetchDateLocationPattern(String dataType) {
152     if (datePatternMap.containsKey(dataType)) {
153       return datePatternMap.get(dataType);
154     }
155 
156     JobConf jobConf = Demux.jobConf;
157     String datePattern = null;
158 
159     if (jobConf != null) {
160       datePattern = jobConf.get("TsProcessor.default.time.regex", null);
161       datePattern = jobConf.get("TsProcessor.time.regex." + chunk.getDataType(),
162                                datePattern);
163     }
164 
165     Pattern pattern = datePattern != null ? Pattern.compile(datePattern) : null;
166     datePatternMap.put(dataType, pattern);
167 
168     return pattern;
169   }
170 
171 }