View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.Map.Entry;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  
27  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30  import org.apache.hadoop.mapred.OutputCollector;
31  import org.apache.hadoop.mapred.Reporter;
32  import org.apache.log4j.Logger;
33  
34  @Table(name="SystemMetrics",columnFamily="Ps")
35  public class Ps extends AbstractProcessor {
36    static Logger log = Logger.getLogger(Ps.class);
37    public static final String reduceType = "Ps";
38  
39    @Override
40    protected void parse(String recordEntry,
41        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
42        throws Throwable {
43      LogEntry log = new LogEntry(recordEntry);
44      PsOutput ps = new PsOutput(log.getBody());
45      for (HashMap<String, String> processInfo : ps.getProcessList()) {
46        key = new ChukwaRecordKey();
47        ChukwaRecord record = new ChukwaRecord();
48        this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType);
49        for (Entry<String, String> entry : processInfo.entrySet()) {
50          record.add(entry.getKey(), entry.getValue());
51        }
52        output.collect(key, record);
53      }
54    }
55  
56    public static class PsOutput {
57  
58      // processes info
59      private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>();
60  
61      public PsOutput(String psCmdOutput) throws InvalidPsRecord {
62        if (psCmdOutput == null || psCmdOutput.length() == 0)
63          return;
64  
65        String[] lines = psCmdOutput.split("[\n\r]+");
66  
67        // at least two lines
68        if (lines.length < 2)
69          return;
70  
71        // header
72        ArrayList<String> header = new ArrayList<String>();
73        Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]);
74        while (matcher.find()) {
75          header.add(matcher.group(0));
76        }
77        if (!header.get(header.size() - 1).equals("CMD")) {
78          throw new InvalidPsRecord("CMD must be the last column");
79        }
80  
81        // records
82        boolean foundInitCmd = false;
83        for (int line = 1; line < lines.length; line++) {
84          HashMap<String, String> record = new HashMap<String, String>();
85          recordList.add(record);
86  
87          matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]);
88          for (int index = 0; index < header.size(); index++) {
89            String key = header.get(index);
90            matcher.find();
91            if (!key.equals("CMD")) {
92              String value = matcher.group(0);
93              /**
94               * For STARTED column, it could be in two formats: "MMM dd" or
95               * "hh:mm:ss". If we use ' ' as the delimiter, we must read twice to
96               * the date if it's with "MMM dd" format.
97               */
98              if (key.equals("STARTED")) {
99                char c = value.charAt(0);
100               if (c < '0' || c > '9') {
101                 matcher.find();
102                 value += matcher.group(0);
103               }
104             }
105             record.put(key, value);
106           } else {
107             // reached the cmd part. all remains should be put
108             // together as the command
109             String value = lines[line].substring(matcher.start());
110             record.put(key, value);
111             if (!foundInitCmd)
112               foundInitCmd = value.startsWith("init");
113             break;
114           }
115         }
116       }
117       if (!foundInitCmd)
118         throw new InvalidPsRecord("Did not find 'init' cmd");
119     }
120 
121     public ArrayList<HashMap<String, String>> getProcessList() {
122       return recordList;
123     }
124   }
125 
126   public static class InvalidPsRecord extends Exception {
127     private static final long serialVersionUID = 1L;
128 
129     public InvalidPsRecord() {
130     }
131 
132     public InvalidPsRecord(String arg0) {
133       super(arg0);
134     }
135 
136     public InvalidPsRecord(Throwable arg0) {
137       super(arg0);
138     }
139 
140     public InvalidPsRecord(String arg0, Throwable arg1) {
141       super(arg0, arg1);
142     }
143   }
144 }