View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  
27  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.mapred.OutputCollector;
32  import org.apache.hadoop.mapred.Reporter;
33  import org.apache.log4j.Logger;
34  
35  @Tables(annotations={
36  @Table(name="HadoopLog",columnFamily="NameNode"),
37  @Table(name="HadoopLog",columnFamily="DataNode"),
38  @Table(name="HadoopLog",columnFamily="Audit")
39  })
40  public class HadoopLogProcessor extends AbstractProcessor {
41    static Logger log = Logger.getLogger(HadoopLogProcessor.class);
42  
43    private static final String recordType = "HadoopLog";
44    private static final String nameNodeType = "NameNode";
45    private static final String dataNodeType = "DataNode";
46    private static final String auditType = "Audit";
47    private SimpleDateFormat sdf = null;
48  
49    public HadoopLogProcessor() {
50      // TODO move that to config
51      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
52    }
53  
54    @Override
55    public void parse(String recordEntry,
56        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57        throws Throwable {
58      try {
59        String dStr = recordEntry.substring(0, 23);
60        Date d = sdf.parse(dStr);
61        ChukwaRecord record = new ChukwaRecord();
62  
63        if (this.chunk.getStreamName().indexOf("datanode") > 0) {
64          buildGenericRecord(record, recordEntry, d.getTime(), dataNodeType);
65        } else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
66          buildGenericRecord(record, recordEntry, d.getTime(), nameNodeType);
67        } else if (this.chunk.getStreamName().indexOf("audit") > 0) {
68          buildGenericRecord(record, recordEntry, d.getTime(), auditType);
69        } else {
70          buildGenericRecord(record, recordEntry, d.getTime(), recordType);
71        }
72  
73        output.collect(key, record);
74      } catch (ParseException e) {
75        log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
76            + "]", e);
77        e.printStackTrace();
78        throw e;
79      } catch (IOException e) {
80        log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
81            + "]", e);
82        e.printStackTrace();
83        throw e;
84      }
85    }
86  
87    public String getDataType() {
88      return HadoopLogProcessor.recordType;
89    }
90  
91  }