View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.ArrayList;
22  
23  import java.util.HashMap;
24  import java.util.Map.Entry;
25  import java.util.regex.Matcher;
26  import java.util.regex.Pattern;
27  
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.mapred.JobHistory;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  
36  @Tables(annotations={
37  @Table(name="Mapreduce",columnFamily="JobData"),
38  @Table(name="Mapreduce",columnFamily="TaskData")
39  })
40  public class JobLog extends AbstractProcessor {
41    private String savedLines = "";
42    
43    /**
44     * Job logs could be split into multiple lines.  
45     * If input recordEntry ends with '"' or '" .', process the line. 
46     * Otherwise, save the log and wait for the next log. 
47     * 
48     * @return An object of JobLogLine if a full job log is found. Null otherwise.
49     */
50    public JobLogLine getJobLogLine(String recordEntry) {
51      if(recordEntry == null) {
52        savedLines = "";
53        return null;
54      }
55      recordEntry = recordEntry.trim();
56      if(recordEntry.length() == 0 || recordEntry.startsWith("Meta")) {
57        savedLines = "";
58        return null;
59      }
60      
61      if(recordEntry.startsWith("Job") 
62          || recordEntry.startsWith("Meta") 
63          || recordEntry.startsWith("Task") 
64          || recordEntry.startsWith("MapAttempt") 
65          || recordEntry.startsWith("ReduceAttempt")) 
66      {
67        savedLines = "";
68      }
69      
70      savedLines += recordEntry;
71      if(!savedLines.endsWith("\"") && !savedLines.endsWith("\" .")) {
72        return null;
73      }
74      
75      JobLogLine line = new JobLogLine(savedLines);
76      return line;
77    }
78    
79  	@Override
80  	protected void parse(String recordEntry,
81  			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
82  			Reporter reporter) throws Throwable 
83  	{
84  	  JobLogLine line = getJobLogLine(recordEntry);
85  	  if(line == null || (!line.getLogType().equals("Meta") 
86  	                      && !line.getLogType().equals("JobData") 
87  	                      && !line.getLogType().equals("TaskData"))) 
88  	  {
89  	    return;
90  	  }
91  	  
92      if(line.getLogType().equals("Meta")) {
93        String streamName = chunk.getStreamName();
94        if(streamName == null) {
95          return;
96        }
97        String jobId = JobLogFileName.getJobIdFromFileName(streamName);
98        if(jobId == null) {
99          return;
100       }
101       line.setLogType("JobData");
102     }
103     
104 		key = new ChukwaRecordKey();
105 		ChukwaRecord record = new ChukwaRecord();
106 		this.buildGenericRecord(record, null, -1l, line.getLogType());
107 		
108 		for (Entry<String, String> entry : line.entrySet()) {
109 			record.add(entry.getKey(), entry.getValue());
110 		}
111 		
112 		for(Entry<String, Long> entry : line.getCounterHash().flat().entrySet()) {
113 			record.add(entry.getKey(), entry.getValue().toString());
114 		}
115 		
116 		long timestamp = line.getTimestamp();
117 		record.setTime(timestamp);
118 		key.setKey(getKey(timestamp, line.getJobId()));
119 		output.collect(key, record);
120 	}
121 	
122 	private String getKey(long ts, String jobId) {
123 		long unit = 60 * 60 * 1000;
124 		if(ts == 0) {
125 		  ts = archiveKey.getTimePartition();
126 		}
127 		long rounded = (ts / unit) * unit;
128 		return rounded + "/" + jobId + "/" + ts;
129 	}
130 
131 	public static class JobLogLine extends HashMap<String, String> {
132 		private static final long serialVersionUID = 4902948603527677036L;
133 		
134 		/**
135 		 * search timestamp from stream. if no timestamp found, use last seen one.
136 		 */
137 		private static final String[] timestampKeys = { 
138 			JobHistory.Keys.SUBMIT_TIME.toString(),
139 			JobHistory.Keys.LAUNCH_TIME.toString(),
140 			JobHistory.Keys.START_TIME.toString(),
141 			JobHistory.Keys.FINISH_TIME.toString(), 
142 		};
143 		private static long lastTimestamp = 0l;
144 
145 		private String logType;
146 		private String jobId;
147 		private String taskId;
148 		private CounterHash counterHash;
149 
150 		/**
151 		 * example lines: 
152 		 * 		Task TASKID="task_200903062215_0577_r_000000" TASK_TYPE="REDUCE" START_TIME="1236386538540" SPLITS="" .
153 		 *		Job JOBID="job_200903062215_0577" JOB_PRIORITY="NORMAL" .
154 		 *		Job JOBID="job_200903062215_0577" LAUNCH_TIME="1236386526545" TOTAL_MAPS="14" TOTAL_REDUCES="1" JOB_STATUS="PREP" .
155 		 */
156 		public JobLogLine(String line) {
157 			line = line.trim();
158 			if (line.length() == 0)
159 				return;
160 
161 			String key = null;
162 			String[] pairs = line.split("=\"");
163 			for (int i = 0; i < pairs.length; i++) {
164 				if (i == 0) {
165 					String[] fields = pairs[i].split(" ");
166 					
167 					logType = fields[0];
168 					if(logType.equals("Job")) {
169 						logType = "JobData";
170 					}
171 					else if (logType.equals("Task") || logType.equals("MapAttempt") || logType.equals("ReduceAttempt")) {
172 						logType = "TaskData";
173 					}
174 					
175 					if (fields.length > 1)
176 						key = fields[1];
177 					continue;
178 				}
179 
180 				int pos = pairs[i].lastIndexOf('"');
181 				String value = pairs[i].substring(0, pos);
182 	                        put(key, value);				  
183 				if(i == (pairs.length-1))
184 					break;
185 				key = pairs[i].substring(pos + 2);
186 			}
187 			
188 			// jobid format: job_200903062215_0577
189 			jobId = get(JobHistory.Keys.JOBID.toString());
190 			
191 			// taskid format: task_200903062215_0577_r_000000
192 			taskId = get(JobHistory.Keys.TASKID.toString());
193 			if(taskId != null) {
194 				String[] fields = taskId.split("_");
195 				jobId = "job_" + fields[1] + "_" + fields[2];
196 				put(JobHistory.Keys.JOBID.toString(), jobId);
197 				taskId = taskId.substring(5);
198 			}
199 			
200 			counterHash = new CounterHash(get(JobHistory.Keys.COUNTERS.toString()));
201 			
202 			if(get("TASK_ATTEMPT_ID") != null) {
203 				put("TASK_ATTEMPT_TIMES", "" + getAttempts());
204 			}
205 			
206 			if(logType.equals("JobData") && get(JobHistory.Keys.FINISH_TIME.toString())!=null) {
207 			  put("JOB_FINAL_STATUS", get("JOB_STATUS"));
208 			}
209 			
210             for(String timeKey : timestampKeys) {
211                 String value = get(timeKey);
212                 if(value == null || value.equals("0")) {
213                     remove(timeKey);
214                 }
215             }
216 		}
217 
218     public String getLogType() {
219       return logType;
220     }
221 
222     public void setLogType(String logType) {
223       this.logType = logType;
224     }
225 
226 		public String getJobId() {
227 			return jobId;
228 		}
229 
230 		public String getTaskId() {
231 			return taskId;
232 		}
233 		
234 		public long getTimestamp() {
235 			for(String key : timestampKeys) {
236 				String value = get(key);
237 				if(value != null && value.length() != 0) {
238 					long ts = Long.parseLong(value);
239 					if(ts > lastTimestamp) {
240 						lastTimestamp = ts;
241 					}
242 					break;
243 				}
244 			}
245 			return lastTimestamp;
246 		}
247 		
248 		public CounterHash getCounterHash() {
249 			return counterHash;
250 		}
251 		
252 		public int getAttempts() {
253 			String attemptId = get("TASK_ATTEMPT_ID");
254 			if(attemptId == null) {
255 				return -1;
256 			}
257 			else {
258 				try {
259 					String[] elems = attemptId.split("_");
260 					return Integer.parseInt(elems[elems.length - 1] + 1);
261 				} catch (NumberFormatException e) {
262 					return -1;
263 				}
264 			}
265 		}
266 	}
267 	
268 	/**
269 	 * Parse counter string to object
270 	 * 
271 	 * Example string:
272 	 * {(org\.apache\.hadoop\.mapred\.JobInProgress$Counter)(Job Counters )
273 		    [(TOTAL_LAUNCHED_REDUCES)(Launched reduce tasks)(1)]
274 		    [(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(14)]
275 		    [(DATA_LOCAL_MAPS)(Data-local map tasks)(14)]
276 		}
277 		{(FileSystemCounters)(FileSystemCounters)
278 		    [(FILE_BYTES_READ)(FILE_BYTES_READ)(132)]
279 		    [(HDFS_BYTES_READ)(HDFS_BYTES_READ)(20471)]
280 		    [(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(790)]
281 		    [(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(248)]
282 		}
283 	 */
284 	public static class CounterHash extends HashMap<String, HashMap<String, Long>>{
285 		public CounterHash(String str) {
286 			if(str == null) {
287 				return;
288 			}
289 			
290 			if(str.startsWith("{")) {
291 				for(String group : split(str, "[{}]")) {
292 					HashMap<String, Long> hash = null; 
293 					for(String counter : split(group, "[\\[\\]]")) {
294 						ArrayList<String> idAndDisplay = split(counter, "[\\(\\)]");
295 						if(hash == null) {
296 							hash = new HashMap<String, Long>();
297 							String groupId = idAndDisplay.get(0).replaceAll("\\\\.", ".");
298 							put(groupId, hash);
299 						}
300 						else {
301 							hash.put(idAndDisplay.get(0), Long.parseLong(idAndDisplay.get(2)));
302 						}
303 					}
304 				}
305 			} else {
306 				HashMap<String, Long> hash = new HashMap<String, Long>();
307 				put("Hadoop18", hash);
308 				for(String counter : split(str, ",")) {
309 					ArrayList<String> kv = split(counter, ":");
310 					hash.put(kv.get(0), Long.parseLong(kv.get(1)));
311 				}
312 			}
313 		}
314 		
315 		/**
316 		 * Flat the counter hashs and add into map passed int. 
317 		 * 
318 		 * For example mentioned in the constructor, the result will be
319 		 * <pre>
320 		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:TOTAL_LAUNCHED_REDUCES=1
321 		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:TOTAL_LAUNCHED_MAPS=14
322 		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:DATA_LOCAL_MAPS=14
323 		 * Counter:FileSystemCounters:FILE_BYTES_READ=132
324 		 * Counter:FileSystemCounters:HDFS_BYTES_READ=20471
325 		 * Counter:FileSystemCounters:FILE_BYTES_WRITTEN=790
326 		 * Counter:FileSystemCounters:HDFS_BYTES_WRITTEN=248
327 		 * </pre>
328 		 */
329 		public HashMap<String, Long> flat() {
330 			HashMap<String, Long> result = new HashMap<String, Long>();
331 			for(Entry<String, HashMap<String, Long>> entry : entrySet()) {
332 				String id = entry.getKey();
333 				for(Entry<String, Long> counterValue : entry.getValue().entrySet()) {
334 					result.put("Counter:" + id + ":" + counterValue.getKey(), counterValue.getValue());
335 				}
336 			}
337 			return result;
338 		}
339 	}
340 	
341 	public static ArrayList<String> split(String s, String regex) {
342 		ArrayList<String> result = new ArrayList<String>();
343 		for(String field : s.split(regex)) {
344 			if(field != null && field.length()>0) {
345 				result.add(field);
346 			}
347 		}
348 		return result;
349 	}
350 
351 	private static class JobLogFileName {
352     private static final Pattern pattern = Pattern.compile("job_[0-9]+_[0-9]+");
353     
354     public static String getJobIdFromFileName(String name) {
355       Matcher matcher = pattern.matcher(name);
356       if (matcher.find()) {
357         return matcher.group(0);
358       }
359       else {
360         return null;
361       }
362     }
363 	}
364 
365 }