1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.ArrayList;
22
23 import java.util.HashMap;
24 import java.util.Map.Entry;
25 import java.util.regex.Matcher;
26 import java.util.regex.Pattern;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32 import org.apache.hadoop.mapred.JobHistory;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35
36 @Tables(annotations={
37 @Table(name="Mapreduce",columnFamily="JobData"),
38 @Table(name="Mapreduce",columnFamily="TaskData")
39 })
40 public class JobLog extends AbstractProcessor {
41 private String savedLines = "";
42
43
44
45
46
47
48
49
50 public JobLogLine getJobLogLine(String recordEntry) {
51 if(recordEntry == null) {
52 savedLines = "";
53 return null;
54 }
55 recordEntry = recordEntry.trim();
56 if(recordEntry.length() == 0 || recordEntry.startsWith("Meta")) {
57 savedLines = "";
58 return null;
59 }
60
61 if(recordEntry.startsWith("Job")
62 || recordEntry.startsWith("Meta")
63 || recordEntry.startsWith("Task")
64 || recordEntry.startsWith("MapAttempt")
65 || recordEntry.startsWith("ReduceAttempt"))
66 {
67 savedLines = "";
68 }
69
70 savedLines += recordEntry;
71 if(!savedLines.endsWith("\"") && !savedLines.endsWith("\" .")) {
72 return null;
73 }
74
75 JobLogLine line = new JobLogLine(savedLines);
76 return line;
77 }
78
79 @Override
80 protected void parse(String recordEntry,
81 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
82 Reporter reporter) throws Throwable
83 {
84 JobLogLine line = getJobLogLine(recordEntry);
85 if(line == null || (!line.getLogType().equals("Meta")
86 && !line.getLogType().equals("JobData")
87 && !line.getLogType().equals("TaskData")))
88 {
89 return;
90 }
91
92 if(line.getLogType().equals("Meta")) {
93 String streamName = chunk.getStreamName();
94 if(streamName == null) {
95 return;
96 }
97 String jobId = JobLogFileName.getJobIdFromFileName(streamName);
98 if(jobId == null) {
99 return;
100 }
101 line.setLogType("JobData");
102 }
103
104 key = new ChukwaRecordKey();
105 ChukwaRecord record = new ChukwaRecord();
106 this.buildGenericRecord(record, null, -1l, line.getLogType());
107
108 for (Entry<String, String> entry : line.entrySet()) {
109 record.add(entry.getKey(), entry.getValue());
110 }
111
112 for(Entry<String, Long> entry : line.getCounterHash().flat().entrySet()) {
113 record.add(entry.getKey(), entry.getValue().toString());
114 }
115
116 long timestamp = line.getTimestamp();
117 record.setTime(timestamp);
118 key.setKey(getKey(timestamp, line.getJobId()));
119 output.collect(key, record);
120 }
121
122 private String getKey(long ts, String jobId) {
123 long unit = 60 * 60 * 1000;
124 if(ts == 0) {
125 ts = archiveKey.getTimePartition();
126 }
127 long rounded = (ts / unit) * unit;
128 return rounded + "/" + jobId + "/" + ts;
129 }
130
131 public static class JobLogLine extends HashMap<String, String> {
132 private static final long serialVersionUID = 4902948603527677036L;
133
134
135
136
137 private static final String[] timestampKeys = {
138 JobHistory.Keys.SUBMIT_TIME.toString(),
139 JobHistory.Keys.LAUNCH_TIME.toString(),
140 JobHistory.Keys.START_TIME.toString(),
141 JobHistory.Keys.FINISH_TIME.toString(),
142 };
143 private static long lastTimestamp = 0l;
144
145 private String logType;
146 private String jobId;
147 private String taskId;
148 private CounterHash counterHash;
149
150
151
152
153
154
155
156 public JobLogLine(String line) {
157 line = line.trim();
158 if (line.length() == 0)
159 return;
160
161 String key = null;
162 String[] pairs = line.split("=\"");
163 for (int i = 0; i < pairs.length; i++) {
164 if (i == 0) {
165 String[] fields = pairs[i].split(" ");
166
167 logType = fields[0];
168 if(logType.equals("Job")) {
169 logType = "JobData";
170 }
171 else if (logType.equals("Task") || logType.equals("MapAttempt") || logType.equals("ReduceAttempt")) {
172 logType = "TaskData";
173 }
174
175 if (fields.length > 1)
176 key = fields[1];
177 continue;
178 }
179
180 int pos = pairs[i].lastIndexOf('"');
181 String value = pairs[i].substring(0, pos);
182 put(key, value);
183 if(i == (pairs.length-1))
184 break;
185 key = pairs[i].substring(pos + 2);
186 }
187
188
189 jobId = get(JobHistory.Keys.JOBID.toString());
190
191
192 taskId = get(JobHistory.Keys.TASKID.toString());
193 if(taskId != null) {
194 String[] fields = taskId.split("_");
195 jobId = "job_" + fields[1] + "_" + fields[2];
196 put(JobHistory.Keys.JOBID.toString(), jobId);
197 taskId = taskId.substring(5);
198 }
199
200 counterHash = new CounterHash(get(JobHistory.Keys.COUNTERS.toString()));
201
202 if(get("TASK_ATTEMPT_ID") != null) {
203 put("TASK_ATTEMPT_TIMES", "" + getAttempts());
204 }
205
206 if(logType.equals("JobData") && get(JobHistory.Keys.FINISH_TIME.toString())!=null) {
207 put("JOB_FINAL_STATUS", get("JOB_STATUS"));
208 }
209
210 for(String timeKey : timestampKeys) {
211 String value = get(timeKey);
212 if(value == null || value.equals("0")) {
213 remove(timeKey);
214 }
215 }
216 }
217
218 public String getLogType() {
219 return logType;
220 }
221
222 public void setLogType(String logType) {
223 this.logType = logType;
224 }
225
226 public String getJobId() {
227 return jobId;
228 }
229
230 public String getTaskId() {
231 return taskId;
232 }
233
234 public long getTimestamp() {
235 for(String key : timestampKeys) {
236 String value = get(key);
237 if(value != null && value.length() != 0) {
238 long ts = Long.parseLong(value);
239 if(ts > lastTimestamp) {
240 lastTimestamp = ts;
241 }
242 break;
243 }
244 }
245 return lastTimestamp;
246 }
247
248 public CounterHash getCounterHash() {
249 return counterHash;
250 }
251
252 public int getAttempts() {
253 String attemptId = get("TASK_ATTEMPT_ID");
254 if(attemptId == null) {
255 return -1;
256 }
257 else {
258 try {
259 String[] elems = attemptId.split("_");
260 return Integer.parseInt(elems[elems.length - 1] + 1);
261 } catch (NumberFormatException e) {
262 return -1;
263 }
264 }
265 }
266 }
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 public static class CounterHash extends HashMap<String, HashMap<String, Long>>{
285 public CounterHash(String str) {
286 if(str == null) {
287 return;
288 }
289
290 if(str.startsWith("{")) {
291 for(String group : split(str, "[{}]")) {
292 HashMap<String, Long> hash = null;
293 for(String counter : split(group, "[\\[\\]]")) {
294 ArrayList<String> idAndDisplay = split(counter, "[\\(\\)]");
295 if(hash == null) {
296 hash = new HashMap<String, Long>();
297 String groupId = idAndDisplay.get(0).replaceAll("\\\\.", ".");
298 put(groupId, hash);
299 }
300 else {
301 hash.put(idAndDisplay.get(0), Long.parseLong(idAndDisplay.get(2)));
302 }
303 }
304 }
305 } else {
306 HashMap<String, Long> hash = new HashMap<String, Long>();
307 put("Hadoop18", hash);
308 for(String counter : split(str, ",")) {
309 ArrayList<String> kv = split(counter, ":");
310 hash.put(kv.get(0), Long.parseLong(kv.get(1)));
311 }
312 }
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 public HashMap<String, Long> flat() {
330 HashMap<String, Long> result = new HashMap<String, Long>();
331 for(Entry<String, HashMap<String, Long>> entry : entrySet()) {
332 String id = entry.getKey();
333 for(Entry<String, Long> counterValue : entry.getValue().entrySet()) {
334 result.put("Counter:" + id + ":" + counterValue.getKey(), counterValue.getValue());
335 }
336 }
337 return result;
338 }
339 }
340
341 public static ArrayList<String> split(String s, String regex) {
342 ArrayList<String> result = new ArrayList<String>();
343 for(String field : s.split(regex)) {
344 if(field != null && field.length()>0) {
345 result.add(field);
346 }
347 }
348 return result;
349 }
350
351 private static class JobLogFileName {
352 private static final Pattern pattern = Pattern.compile("job_[0-9]+_[0-9]+");
353
354 public static String getJobIdFromFileName(String name) {
355 Matcher matcher = pattern.matcher(name);
356 if (matcher.find()) {
357 return matcher.group(0);
358 }
359 else {
360 return null;
361 }
362 }
363 }
364
365 }