View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Iterator;
22  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
23  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
24  import org.apache.hadoop.mapred.OutputCollector;
25  import org.apache.hadoop.mapred.Reporter;
26  import org.apache.log4j.Logger;
27  import org.json.JSONObject;
28  
29  public class Log4JMetricsContextProcessor extends AbstractProcessor {
30  
31    static Logger log = Logger.getLogger(Log4JMetricsContextProcessor.class);
32  
33    @Override
34    protected void parse(String recordEntry,
35        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
36        throws Throwable 
37    {
38      Log4JMetricsContextChukwaRecord record = new Log4JMetricsContextChukwaRecord(recordEntry);
39      ChukwaRecord chukwaRecord = record.getChukwaRecord();
40      this.buildGenericRecord(chukwaRecord, null, record.getTimestamp(), record.getRecordType());
41      output.collect(key, chukwaRecord);
42    }
43  
44    // create a static class to cove most of the code for unit test 
45    static class Log4JMetricsContextChukwaRecord {
46      private String recordType = null;
47      private long timestamp = 0;
48      private ChukwaRecord chukwaRecord = new ChukwaRecord();
49      
50      @SuppressWarnings("unchecked")
51      public Log4JMetricsContextChukwaRecord(String recordEntry) throws Throwable {
52        LogEntry log = new LogEntry(recordEntry);
53        JSONObject json = new JSONObject(log.getBody());
54  
55        // round timestamp
56        timestamp = json.getLong("chukwa_timestamp");
57        timestamp = (timestamp / 60000) * 60000;
58  
59        // get record type
60        String contextName = json.getString("contextName");
61        String recordName = json.getString("recordName");
62        recordType = contextName;
63        if (!contextName.equals(recordName)) {
64          recordType += "_" + recordName;
65        }
66  
67        Iterator<String> ki = json.keys();
68        while (ki.hasNext()) {
69          String key = ki.next();
70          String value = json.getString(key);
71          if(value != null) {
72            chukwaRecord.add(key, value);
73          }
74        }
75      }
76  
77      public String getRecordType() {
78        return recordType;
79      }
80  
81      public long getTimestamp() {
82        return timestamp;
83      }
84      
85      public ChukwaRecord getChukwaRecord() {
86        return chukwaRecord;
87      }
88    }
89  }
90