1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.mapred.OutputCollector;
29 import org.apache.hadoop.mapred.Reporter;
30 import org.apache.log4j.Logger;
31
32 public class PbsNodes extends AbstractProcessor {
33 static Logger log = Logger.getLogger(PbsNodes.class);
34
35 private static final String rawPBSRecordType = "PbsNodes";
36 private static final String machinePBSRecordType = "MachinePbsNodes";
37 private SimpleDateFormat sdf = null;
38
39 public PbsNodes() {
40
41 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
42 }
43
44 @Override
45 protected void parse(String recordEntry,
46 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
47 throws Throwable {
48
49
50
51
52 StringBuilder sb = new StringBuilder();
53 int i = 0;
54 String nodeActivityStatus = null;
55 StringBuilder sbFreeMachines = new StringBuilder();
56 StringBuilder sbUsedMachines = new StringBuilder();
57 StringBuilder sbDownMachines = new StringBuilder();
58
59 int totalFreeNode = 0;
60 int totalUsedNode = 0;
61 int totalDownNode = 0;
62
63 String body = null;
64 ChukwaRecord record = null;
65
66 try {
67
68 String dStr = recordEntry.substring(0, 23);
69 int start = 24;
70 int idx = recordEntry.indexOf(' ', start);
71
72 start = idx + 1;
73 idx = recordEntry.indexOf(' ', start);
74
75 body = recordEntry.substring(idx + 1);
76
77 Date d = sdf.parse(dStr);
78
79 String[] lines = body.split("\n");
80 while (i < lines.length) {
81 while ((i < lines.length) && (lines[i].trim().length() > 0)) {
82 sb.append(lines[i].trim()).append("\n");
83 i++;
84 }
85
86 if ((i < lines.length) && (lines[i].trim().length() > 0)) {
87 throw new PbsInvalidEntry(recordEntry);
88 }
89
90
91 i++;
92
93 if (sb.length() > 0) {
94 body = sb.toString();
95
96
97
98 record = new ChukwaRecord();
99 key = new ChukwaRecordKey();
100
101 buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
102 parsePbsRecord(body, record);
103
104
105 output.collect(key, record);
106
107
108
109 nodeActivityStatus = record.getValue("state");
110 if (nodeActivityStatus != null) {
111 if (nodeActivityStatus.equals("free")) {
112 totalFreeNode++;
113 sbFreeMachines.append(record.getValue("Machine")).append(",");
114 } else if (nodeActivityStatus.equals("job-exclusive")) {
115 totalUsedNode++;
116 sbUsedMachines.append(record.getValue("Machine")).append(",");
117 } else {
118 totalDownNode++;
119 sbDownMachines.append(record.getValue("Machine")).append(",");
120 }
121 }
122 sb = new StringBuilder();
123 }
124 }
125
126
127
128 record = new ChukwaRecord();
129 key = new ChukwaRecordKey();
130 buildGenericRecord(record, null, d.getTime(), "NodeActivity");
131
132 record.setTime(d.getTime());
133 record.add("used", "" + totalUsedNode);
134 record.add("free", "" + totalFreeNode);
135 record.add("down", "" + totalDownNode);
136 record.add("usedMachines", sbUsedMachines.toString());
137 record.add("freeMachines", sbFreeMachines.toString());
138 record.add("downMachines", sbDownMachines.toString());
139
140 output.collect(key, record);
141
142 } catch (ParseException e) {
143 e.printStackTrace();
144 log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
145 throw e;
146 } catch (IOException e) {
147 log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
148 + "]", e);
149 e.printStackTrace();
150 throw e;
151 } catch (PbsInvalidEntry e) {
152 log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
153 e.printStackTrace();
154 throw e;
155 }
156
157 }
158
159 protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
160 int i = 0;
161 String[] lines = recordLine.split("\n");
162 record.add("Machine", lines[0]);
163
164 i++;
165 String[] data = null;
166 while (i < lines.length) {
167 data = extractFields(lines[i]);
168 record.add(data[0].trim(), data[1].trim());
169 if (data[0].trim().equalsIgnoreCase("status")) {
170 parseStatusField(data[1].trim(), record);
171 }
172 i++;
173 }
174 }
175
176 protected static void parseStatusField(String statusField, ChukwaRecord record) {
177 String[] data = null;
178 String[] subFields = statusField.trim().split(",");
179 for (String subflied : subFields) {
180 data = extractFields(subflied);
181 record.add("status-" + data[0].trim(), data[1].trim());
182 }
183 }
184
185 static String[] extractFields(String line) {
186 String[] args = new String[2];
187 int index = line.indexOf("=");
188 args[0] = line.substring(0, index);
189 args[1] = line.substring(index + 1);
190
191 return args;
192 }
193
194 public String getDataType() {
195 return PbsNodes.rawPBSRecordType;
196 }
197
198 }