View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.mapred;
20  
21  
22  import java.util.HashMap;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
26  import org.apache.hadoop.chukwa.util.ExceptionUtil;
27  import org.apache.hadoop.fs.Path;
28  
29  public class ChukwaJobTrackerInstrumentation extends
30      org.apache.hadoop.mapred.JobTrackerInstrumentation {
31  
32    protected final JobTracker tracker;
33    private static ChukwaAgentController chukwaClient = null;
34    private static Log log = LogFactory.getLog(JobTrackerInstrumentation.class);
35    private static HashMap<JobID, String> jobHistories = null;
36  
37    public ChukwaJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
38      super(jt, conf);
39      tracker = jt;
40      if (chukwaClient == null) {
41        chukwaClient = new ChukwaAgentController();
42      }
43      if (jobHistories == null) {
44        jobHistories = new HashMap<JobID, String>();
45      }
46    }
47  
48  
49  
50    public void submitJob(JobConf conf, JobID id) {
51      super.submitJob(conf,id);
52      try {
53        String jobFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
54        Path jobHistoryPath = JobHistory.JobInfo
55            .getJobHistoryLogLocation(jobFileName);
56        String jobConfPath = JobHistory.JobInfo.getLocalJobFilePath(id);
57        String adaptorID = chukwaClient
58            .add(
59                "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor",
60                "JobConf", "0 " + jobConfPath, 0);
61        if (jobHistoryPath.toString().matches("^hdfs://")) {
62          adaptorID = chukwaClient.add(
63              "org.apache.hadoop.chukwa.datacollection.adaptor.HDFSAdaptor",
64              "JobHistory", "0 " + jobHistoryPath.toString(), 0);
65        } else {
66          adaptorID = chukwaClient
67              .add(
68                  "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
69                  "JobHistory", "0 " + jobHistoryPath.toString().substring(5), 0);
70        }
71        jobHistories.put(id, adaptorID);
72      } catch (Exception ex) {
73        log.warn(ExceptionUtil.getStackTrace(ex));
74      }
75    }
76  
77    public void finalizeJob(JobConf conf, JobID id) {
78      super.finalizeJob(conf,id);
79      try {
80        if (jobHistories.containsKey(id)) {
81          chukwaClient.remove(jobHistories.get(id));
82        }
83      } catch (Throwable e) {
84        log.warn("could not remove adaptor for this job: " + id.toString(), e);
85        e.printStackTrace();
86      }
87    }
88  
89  }