View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28  import org.apache.hadoop.mapred.OutputCollector;
29  import org.apache.hadoop.mapred.Reporter;
30  import org.apache.log4j.Logger;
31  
32  public class Torque extends AbstractProcessor {
33  
34    static Logger log = Logger.getLogger(Torque.class);
35    private SimpleDateFormat sdf = null;
36  
37    public Torque() {
38      // TODO move that to config
39      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
40    }
41  
42    @Override
43    protected void parse(String recordEntry,
44        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
45        throws Throwable {
46      try {
47        String dStr = recordEntry.substring(0, 23);
48        int start = 24;
49        int idx = recordEntry.indexOf(' ', start);
50        start = idx + 1;
51        idx = recordEntry.indexOf(' ', start);
52        String body = recordEntry.substring(idx + 1);
53        body = body.replaceAll("\n", "");
54        Date d = sdf.parse(dStr);
55        String[] kvpairs = body.split(", ");
56  
57        ChukwaRecord record = new ChukwaRecord();
58        String kvpair = null;
59        String[] halves = null;
60        boolean containRecord = false;
61        for (int i = 0; i < kvpairs.length; ++i) {
62          kvpair = kvpairs[i];
63          if (kvpair.indexOf("=") >= 0) {
64            halves = kvpair.split("=");
65            record.add(halves[0], halves[1]);
66            containRecord = true;
67          }
68        }
69        if (record.containsField("Machine")) {
70          buildGenericRecord(record, null, d.getTime(), "HodMachine");
71        } else {
72          buildGenericRecord(record, null, d.getTime(), "HodJob");
73        }
74        if (containRecord) {
75          output.collect(key, record);
76        }
77      } catch (ParseException e) {
78        e.printStackTrace();
79        log.warn("Wrong format in Torque [" + recordEntry + "]", e);
80        throw e;
81      } catch (IOException e) {
82        e.printStackTrace();
83        log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
84        throw e;
85      }
86  
87    }
88  
89    public String getDataType() {
90      return Torque.class.getName();
91    }
92  
93  }