1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.mapred;
20
21
22 import java.util.HashMap;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
26 import org.apache.hadoop.chukwa.util.ExceptionUtil;
27 import org.apache.hadoop.fs.Path;
28
29 public class ChukwaJobTrackerInstrumentation extends
30 org.apache.hadoop.mapred.JobTrackerInstrumentation {
31
32 protected final JobTracker tracker;
33 private static ChukwaAgentController chukwaClient = null;
34 private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
35 private static HashMap<JobID, String> jobHistories = null;
36
37 public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
38 super(jt, conf);
39 tracker = jt;
40 if (chukwaClient == null) {
41 chukwaClient = new ChukwaAgentController();
42 }
43 if (jobHistories == null) {
44 jobHistories = new HashMap<JobID, String>();
45 }
46 }
47
48
49
50 public void submitJob(JobConf conf, JobID id) {
51 super.submitJob(conf,id);
52 try {
53 String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
54 Path jobHistoryPath = JobHistory.JobInfo
55 .getJobHistoryLogLocation(jobFileName);
56 String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
57 String adaptorID = chukwaClient
58 .add(
59 "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor",
60 "JobConf", "0 " + jobConfPath, 0);
61 if (jobHistoryPath.toString().matches("^hdfs://")) {
62 adaptorID = chukwaClient.add(
63 "org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor",
64 "JobHistory", "0 " + jobHistoryPath.toString(), 0);
65 } else {
66 adaptorID = chukwaClient
67 .add(
68 "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
69 "JobHistory", "0 " + jobHistoryPath.toString().substring(5), 0);
70 }
71 jobHistories.put(id, adaptorID);
72 } catch (Exception ex) {
73 log.warn(ExceptionUtil.getStackTrace(ex));
74 }
75 }
76
77 public void finalizeJob(JobConf conf, JobID id) {
78 super.finalizeJob(conf,id);
79 try {
80 if (jobHistories.containsKey(id)) {
81 chukwaClient.remove(jobHistories.get(id));
82 }
83 } catch (Throwable e) {
84 log.warn("could not remove adaptor for this job: " + id.toString(), e);
85 e.printStackTrace();
86 }
87 }
88
89 }