View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  
27  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30  import org.apache.hadoop.mapred.OutputCollector;
31  import org.apache.hadoop.mapred.Reporter;
32  import org.apache.log4j.Logger;
33  
34  @Table(name="SystemMetrics",columnFamily="Disk")
35  public class Df extends AbstractProcessor {
36    static Logger log = Logger.getLogger(Df.class);
37    
38    private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
39        "Used", "Available", "Use%", "Mounted", "on" };
40    private static final String[] headerCols = { "Filesystem", "1K-blocks",
41        "Used", "Available", "Use%", "Mounted on" };
42    private SimpleDateFormat sdf = null;
43  
44    public Df() {
45      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
46    }
47  
48    @Override
49    protected void parse(String recordEntry,
50        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51        throws Throwable {
52  
53      try {
54        String dStr = recordEntry.substring(0, 23);
55        int start = 24;
56        int idx = recordEntry.indexOf(' ', start);
57        // String level = recordEntry.substring(start, idx);
58        start = idx + 1;
59        idx = recordEntry.indexOf(' ', start);
60        // String className = recordEntry.substring(start, idx-1);
61        String body = recordEntry.substring(idx + 1);
62  
63        Date d = sdf.parse(dStr);
64        String[] lines = body.split("\n");
65  
66        String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++");
67  
68        if (outputCols.length != headerSplitCols.length
69            || outputCols[0].intern() != headerSplitCols[0].intern()
70            || outputCols[1].intern() != headerSplitCols[1].intern()
71            || outputCols[2].intern() != headerSplitCols[2].intern()
72            || outputCols[3].intern() != headerSplitCols[3].intern()
73            || outputCols[4].intern() != headerSplitCols[4].intern()
74            || outputCols[5].intern() != headerSplitCols[5].intern()
75            || outputCols[6].intern() != headerSplitCols[6].intern()) {
76          throw new DFInvalidRecord("Wrong output format (header) ["
77              + recordEntry + "]");
78        }
79  
80        String[] values = null;
81  
82        // Data
83        ChukwaRecord record = null;
84  
85        for (int i = 1; i < lines.length; i++) {
86          values = lines[i].split("[\\s]++");
87          key = new ChukwaRecordKey();
88          record = new ChukwaRecord();
89          this.buildGenericRecord(record, null, d.getTime(), "Df");
90  
91          record.add(headerCols[0], values[0]);
92          record.add(headerCols[1], values[1]);
93          record.add(headerCols[2], values[2]);
94          record.add(headerCols[3], values[3]);
95          record.add(headerCols[4], values[4]
96              .substring(0, values[4].length() - 1)); // Remove %
97          record.add(headerCols[5], values[5]);
98  
99          output.collect(key, record);
100       }
101 
102       // log.info("DFProcessor output 1 DF record");
103     } catch (ParseException e) {
104       e.printStackTrace();
105       log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
106       throw e;
107     } catch (IOException e) {
108       e.printStackTrace();
109       log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
110           e);
111       throw e;
112     } catch (DFInvalidRecord e) {
113       e.printStackTrace();
114       log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
115       throw e;
116     }
117   }
118 }