View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28  import org.apache.hadoop.mapred.OutputCollector;
29  import org.apache.hadoop.mapred.Reporter;
30  import org.apache.log4j.Logger;
31  
32  public class PbsNodes extends AbstractProcessor {
33    static Logger log = Logger.getLogger(PbsNodes.class);
34  
35    private static final String rawPBSRecordType = "PbsNodes";
36    private static final String machinePBSRecordType = "MachinePbsNodes";
37    private SimpleDateFormat sdf = null;
38  
39    public PbsNodes() {
40      // TODO move that to config
41      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
42    }
43  
44    @Override
45    protected void parse(String recordEntry,
46        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
47        throws Throwable {
48  
49      // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" +
50      // chunk.getDataType() + "]");
51  
52      StringBuilder sb = new StringBuilder();
53      int i = 0;
54      String nodeActivityStatus = null;
55      StringBuilder sbFreeMachines = new StringBuilder();
56      StringBuilder sbUsedMachines = new StringBuilder();
57      StringBuilder sbDownMachines = new StringBuilder();
58  
59      int totalFreeNode = 0;
60      int totalUsedNode = 0;
61      int totalDownNode = 0;
62  
63      String body = null;
64      ChukwaRecord record = null;
65  
66      try {
67  
68        String dStr = recordEntry.substring(0, 23);
69        int start = 24;
70        int idx = recordEntry.indexOf(' ', start);
71        // String level = recordEntry.substring(start, idx);
72        start = idx + 1;
73        idx = recordEntry.indexOf(' ', start);
74        // String className = recordEntry.substring(start, idx-1);
75        body = recordEntry.substring(idx + 1);
76  
77        Date d = sdf.parse(dStr);
78  
79        String[] lines = body.split("\n");
80        while (i < lines.length) {
81          while ((i < lines.length) && (lines[i].trim().length() > 0)) {
82            sb.append(lines[i].trim()).append("\n");
83            i++;
84          }
85  
86          if ((i < lines.length) && (lines[i].trim().length() > 0)) {
87            throw new PbsInvalidEntry(recordEntry);
88          }
89  
90          // Empty line
91          i++;
92  
93          if (sb.length() > 0) {
94            body = sb.toString();
95            // Process all entries for a machine
96            // System.out.println("=========>>> Record [" + body+ "]");
97  
98            record = new ChukwaRecord();
99            key = new ChukwaRecordKey();
100 
101           buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
102           parsePbsRecord(body, record);
103 
104           // Output PbsNode record for 1 machine
105           output.collect(key, record);
106           // log.info("PbsNodeProcessor output 1 sub-record");
107 
108           // compute Node Activity information
109           nodeActivityStatus = record.getValue("state");
110           if (nodeActivityStatus != null) {
111             if (nodeActivityStatus.equals("free")) {
112               totalFreeNode++;
113               sbFreeMachines.append(record.getValue("Machine")).append(",");
114             } else if (nodeActivityStatus.equals("job-exclusive")) {
115               totalUsedNode++;
116               sbUsedMachines.append(record.getValue("Machine")).append(",");
117             } else {
118               totalDownNode++;
119               sbDownMachines.append(record.getValue("Machine")).append(",");
120             }
121           }
122           sb = new StringBuilder();
123         }
124       }
125 
126       // End of parsing
127 
128       record = new ChukwaRecord();
129       key = new ChukwaRecordKey();
130       buildGenericRecord(record, null, d.getTime(), "NodeActivity");
131 
132       record.setTime(d.getTime());
133       record.add("used", "" + totalUsedNode);
134       record.add("free", "" + totalFreeNode);
135       record.add("down", "" + totalDownNode);
136       record.add("usedMachines", sbUsedMachines.toString());
137       record.add("freeMachines", sbFreeMachines.toString());
138       record.add("downMachines", sbDownMachines.toString());
139 
140       output.collect(key, record);
141       // log.info("PbsNodeProcessor output 1 NodeActivity");
142     } catch (ParseException e) {
143       e.printStackTrace();
144       log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
145       throw e;
146     } catch (IOException e) {
147       log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
148           + "]", e);
149       e.printStackTrace();
150       throw e;
151     } catch (PbsInvalidEntry e) {
152       log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
153       e.printStackTrace();
154       throw e;
155     }
156 
157   }
158 
159   protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
160     int i = 0;
161     String[] lines = recordLine.split("\n");
162     record.add("Machine", lines[0]);
163 
164     i++;
165     String[] data = null;
166     while (i < lines.length) {
167       data = extractFields(lines[i]);
168       record.add(data[0].trim(), data[1].trim());
169       if (data[0].trim().equalsIgnoreCase("status")) {
170         parseStatusField(data[1].trim(), record);
171       }
172       i++;
173     }
174   }
175 
176   protected static void parseStatusField(String statusField, ChukwaRecord record) {
177     String[] data = null;
178     String[] subFields = statusField.trim().split(",");
179     for (String subflied : subFields) {
180       data = extractFields(subflied);
181       record.add("status-" + data[0].trim(), data[1].trim());
182     }
183   }
184 
185   static String[] extractFields(String line) {
186     String[] args = new String[2];
187     int index = line.indexOf("=");
188     args[0] = line.substring(0, index);
189     args[1] = line.substring(index + 1);
190 
191     return args;
192   }
193 
194   public String getDataType() {
195     return PbsNodes.rawPBSRecordType;
196   }
197 
198 }