View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.BufferedWriter;
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.FilenameFilter;
29  import java.io.IOException;
30  import java.io.InputStreamReader;
31  import java.io.OutputStreamWriter;
32  import java.io.PrintWriter;
33  import java.security.NoSuchAlgorithmException;
34  import java.util.*;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.regex.Matcher;
37  import java.util.regex.Pattern;
38  
39  import org.apache.hadoop.chukwa.datacollection.DataFactory;
40  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
41  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
42  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
43  import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
44  import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
45  import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
46  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
47  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
48  import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
49  import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
50  import org.apache.hadoop.chukwa.util.DaemonWatcher;
51  import org.apache.hadoop.chukwa.util.ExceptionUtil;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.Path;
54  import org.apache.log4j.Logger;
55  
56  import org.mortbay.jetty.Server;
57  import org.mortbay.jetty.servlet.Context;
58  import org.mortbay.jetty.servlet.ServletHolder;
59  import org.mortbay.jetty.nio.SelectChannelConnector;
60  import org.mortbay.thread.BoundedThreadPool;
61  import com.sun.jersey.spi.container.servlet.ServletContainer;
62  import edu.berkeley.confspell.*;
63  
64  /**
65   * The local agent daemon that runs on each machine. This class is designed to
66   * be embeddable, for use in testing.
67   * <P>
68   * The agent will start an HTTP REST interface listening on port. Configs for
69   * the agent are:
70   * <ul>
71   * <li><code>chukwaAgent.http.port</code> Port to listen on (default=9090).</li>
72   * <li><code>chukwaAgent.http.rest.controller.packages</code> Java packages to
73   * inspect for JAX-RS annotated classes to be added as servlets to the REST
74   * server.</li>
75   * </ul>
76   * 
77   */
78  public class ChukwaAgent implements AdaptorManager {
79    // boolean WRITE_CHECKPOINTS = true;
80    static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
81  
82    private static final int HTTP_SERVER_THREADS = 120;
83    private static Server jettyServer = null;
84    private OffsetStatsManager adaptorStatsManager = null;
85    private Timer statsCollector = null;
86  
87    static Logger log = Logger.getLogger(ChukwaAgent.class);
88    static ChukwaAgent agent = null;
89  
90    public static ChukwaAgent getAgent() {
91      return agent;
92    }
93  
94    static Configuration conf = null;
95    Connector connector = null;
96  
97    // doesn't need an equals(), comparator, etc
98    public static class Offset {
99      public Offset(long l, String id) {
100       offset = l;
101       this.id = id;
102     }
103 
104     final String id;
105     volatile long offset;
106     public long offset() {
107       return this.offset;
108     }
109     
110     public String adaptorID() {
111       return id;
112     }
113   }
114 
115   public static class AlreadyRunningException extends Exception {
116 
117     private static final long serialVersionUID = 1L;
118 
119     public AlreadyRunningException() {
120       super("Agent already running; aborting");
121     }
122   }
123 
124   private final Map<Adaptor, Offset> adaptorPositions;
125 
126   // basically only used by the control socket thread.
127   //must be locked before access
128   private final Map<String, Adaptor> adaptorsByName;
129 
130   private File checkpointDir; // lock this object to indicate checkpoint in
131   // progress
132   private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
133   // checkpoints
134   private static String tags = "";
135 
136   private Timer checkpointer;
137   private volatile boolean needNewCheckpoint = false; // set to true if any
138   // event has happened
139   // that should cause a new checkpoint to be written
140   private int checkpointNumber; // id number of next checkpoint.
141   // should be protected by grabbing lock on checkpointDir
142 
143   private final AgentControlSocketListener controlSock;
144 
145   public int getControllerPort() {
146     return controlSock.getPort();
147   }
148 
149   public OffsetStatsManager getAdaptorStatsManager() {
150     return adaptorStatsManager;
151   }
152 
153   /**
154    * @param args
155    * @throws AdaptorException
156    */
157   public static void main(String[] args) throws AdaptorException {
158 
159     DaemonWatcher.createInstance("Agent");
160 
161     try {
162       if (args.length > 0 && args[0].equals("-help")) {
163         System.out.println("usage:  LocalAgent [-noCheckPoint]"
164             + "[default collector URL]");
165         System.exit(0);
166       }
167 
168       conf = readConfig();
169       agent = new ChukwaAgent(conf);
170       
171       if (agent.anotherAgentIsRunning()) {
172         System.out
173             .println("another agent is running (or port has been usurped). "
174                 + "Bailing out now");
175         throw new AlreadyRunningException();
176       }
177 
178       int uriArgNumber = 0;
179       if (args.length > 0) {
180         if (args[uriArgNumber].equals("local"))
181           agent.connector = new ConsoleOutConnector(agent);
182         else {
183           if (!args[uriArgNumber].contains("://"))
184             args[uriArgNumber] = "http://" + args[uriArgNumber];
185           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
186         }
187       } else
188         agent.connector = new HttpConnector(agent);
189 
190       agent.connector.start();
191 
192       log.info("local agent started on port " + agent.getControlSock().portno);
193       System.out.close();
194       System.err.close();
195     } catch (AlreadyRunningException e) {
196       log.error("agent started already on this machine with same portno;"
197           + " bailing out");
198       System.out
199           .println("agent started already on this machine with same portno;"
200               + " bailing out");
201       DaemonWatcher.bailout(-1);
202       System.exit(0); // better safe than sorry
203     } catch (Exception e) {
204       e.printStackTrace();
205     }
206   }
207 
208   private boolean anotherAgentIsRunning() {
209     return !controlSock.isBound();
210   }
211 
212   /**
213    * @return the number of running adaptors inside this local agent
214    */
215   @Override
216   public int adaptorCount() {
217     synchronized(adaptorsByName) {
218       return adaptorsByName.size();
219     }
220   }
221 
222   public ChukwaAgent() throws AlreadyRunningException {
223     this(new Configuration());
224   }
225 
226   public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
227     ChukwaAgent.agent = this;
228     this.conf = conf;
229     
230     // almost always just reading this; so use a ConcurrentHM.
231     // since we wrapped the offset, it's not a structural mod.
232     adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
233     adaptorsByName = new HashMap<String, Adaptor>();
234     checkpointNumber = 0;
235 
236     boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
237         "chukwaAgent.checkpoint.enabled", true);
238     CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
239         "chukwa_checkpoint_");
240     final int CHECKPOINT_INTERVAL_MS = conf.getInt(
241         "chukwaAgent.checkpoint.interval", 5000);
242     final int STATS_INTERVAL_MS = conf.getInt(
243         "chukwaAgent.stats.collection.interval", 10000);
244     final int STATS_DATA_TTL_MS = conf.getInt(
245         "chukwaAgent.stats.data.ttl", 1200000);
246 
247     if (conf.get("chukwaAgent.checkpoint.dir") != null)
248       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
249     else
250       DO_CHECKPOINT_RESTORE = false;
251 
252     if (checkpointDir != null && !checkpointDir.exists()) {
253       checkpointDir.mkdirs();
254     }
255     tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
256     DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
257 
258     log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
259     log.info("Config - checkpointDir: [" + checkpointDir + "]");
260     log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
261         + "]");
262     log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
263     log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
264     log.info("Config - tags: [" + tags + "]");
265 
266     if (DO_CHECKPOINT_RESTORE) {
267       log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
268     }
269 
270     File initialAdaptors = null;
271     if (conf.get("chukwaAgent.initial_adaptors") != null)
272       initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
273 
274     try {
275       if (DO_CHECKPOINT_RESTORE) {
276         restoreFromCheckpoint();
277       }
278     } catch (IOException e) {
279       log.warn("failed to restart from checkpoint: ", e);
280     }
281 
282     try {
283       if (initialAdaptors != null && initialAdaptors.exists())
284         readAdaptorsFile(initialAdaptors); 
285     } catch (IOException e) {
286       log.warn("couldn't read user-specified file "
287           + initialAdaptors.getAbsolutePath());
288     }
289 
290     controlSock = new AgentControlSocketListener(this);
291     try {
292       controlSock.tryToBind(); // do this synchronously; if it fails, we know
293       // another agent is running.
294       controlSock.start(); // this sets us up as a daemon
295       log.info("control socket started on port " + controlSock.portno);
296 
297       // start the HTTP server with stats collection
298       try {
299         this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
300         this.statsCollector = new Timer("ChukwaAgent Stats Collector");
301 
302         startHttpServer(conf);
303 
304         statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
305                 STATS_INTERVAL_MS, STATS_INTERVAL_MS);
306       } catch (Exception e) {
307         log.error("Couldn't start HTTP server", e);
308         throw new RuntimeException(e);
309       }
310 
311       // shouldn't start checkpointing until we're finishing launching
312       // adaptors on boot
313       if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
314         checkpointer = new Timer();
315         checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
316       }
317     } catch (IOException e) {
318       log.info("failed to bind to socket; aborting agent launch", e);
319       throw new AlreadyRunningException();
320     }
321 
322   }
323 
324   private void startHttpServer(Configuration conf) throws Exception {
325     int portNum = conf.getInt("chukwaAgent.http.port", 9090);
326     String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
327     StringBuilder jaxRsPackages = new StringBuilder(
328             "org.apache.hadoop.chukwa.datacollection.agent.rest");
329 
330     // Allow the ability to add additional servlets to the server
331     if (jaxRsAddlPackages != null)
332       jaxRsPackages.append(';').append(jaxRsAddlPackages);
333 
334     // Set up jetty connector
335     SelectChannelConnector jettyConnector = new SelectChannelConnector();
336     jettyConnector.setLowResourcesConnections(HTTP_SERVER_THREADS - 10);
337     jettyConnector.setLowResourceMaxIdleTime(1500);
338     jettyConnector.setPort(portNum);
339 
340     // Set up jetty server, using connector
341     jettyServer = new Server(portNum);
342     jettyServer.setConnectors(new org.mortbay.jetty.Connector[] { jettyConnector });
343     BoundedThreadPool pool = new BoundedThreadPool();
344     pool.setMaxThreads(HTTP_SERVER_THREADS);
345     jettyServer.setThreadPool(pool);
346 
347     // Create the controller servlets
348     ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
349     servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
350             "com.sun.jersey.api.core.PackagesResourceConfig");
351     servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
352             jaxRsPackages.toString());
353 
354     // Create the server context and add the servlet
355     Context root = new Context(jettyServer, "/rest/v1", Context.SESSIONS);
356     root.setAttribute("ChukwaAgent", this);
357     root.addServlet(servletHolder, "/*");
358     root.setAllowNullPathInfo(false);
359 
360     // And finally, fire up the server
361     jettyServer.start();
362     jettyServer.setStopAtShutdown(true);
363 
364     log.info("started Chukwa http agent interface on port " + portNum);
365   }
366 
367   /**
368    * Take snapshots of offset data so we can report flow rate stats.
369    */
370   private class StatsCollectorTask extends TimerTask {
371 
372     public void run() {
373       long now = System.currentTimeMillis();
374 
375       for(String adaptorId : getAdaptorList().keySet()) {
376         Adaptor adaptor = getAdaptor(adaptorId);
377         if(adaptor == null) continue;
378 
379         Offset offset = adaptorPositions.get(adaptor);
380         if(offset == null) continue;
381 
382         adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
383       }
384     }
385   }
386 
387   // words should contain (space delimited):
388   // 0) command ("add")
389   // 1) Optional adaptor name, followed by =
390   // 2) AdaptorClassname
391   // 3) dataType (e.g. "hadoop_log")
392   // 4) params <optional>
393   // (e.g. for files, this is filename,
394   // but can be arbitrarily many space
395   // delimited agent specific params )
396   // 5) offset
397   private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+" // command "add",
398                                                              // any case, plus
399                                                              // at least one
400                                                              // space
401       + "(?:"   //noncapturing group
402       +	"([^\\s=]+)" //containing a string (captured) 
403       + "\\s*=\\s*" //then an equals sign, potentially set off with whitespace
404       + ")?" //end optional noncapturing group 
405       + "([^\\s=]+)\\s+" // the adaptor classname, plus at least one space. No '=' in name
406       + "(\\S+)\\s+" // datatype, plus at least one space
407       + "(?:" // start a non-capturing group, for the parameters
408       + "(.*?)\\s+" // capture the actual parameters reluctantly, followed by
409                     // whitespace
410       + ")?" // end non-matching group for params; group is optional
411       + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
412 
413   /**
414    * Most of the Chukwa wire protocol is implemented in @link{AgentControlSocketListener}
415    * 
416    * Unlike the rest of the chukwa wire protocol, add commands can appear in
417    * initial_adaptors and checkpoint files. So it makes sense to handle them here.
418    * 
419    */
420   public String processAddCommand(String cmd) {
421     try {
422       return processAddCommandE(cmd);
423     } catch(AdaptorException e) {
424       return null;
425     }
426   }
427   
428 
429   public String processAddCommandE(String cmd) throws AdaptorException {
430     Matcher m = addCmdPattern.matcher(cmd);
431     if (m.matches()) {
432       long offset; // check for obvious errors first
433       try {
434         offset = Long.parseLong(m.group(5));
435       } catch (NumberFormatException e) {
436         log.warn("malformed line " + cmd);
437         throw new AdaptorException("bad input syntax");
438       }
439 
440       String adaptorID = m.group(1);
441       String adaptorClassName = m.group(2);
442       String dataType = m.group(3);
443       String params = m.group(4);
444       if (params == null)
445         params = "";
446       
447       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
448       if (adaptor == null) {
449         log.warn("Error creating adaptor of class " + adaptorClassName);
450         throw new AdaptorException("Can't load class " + adaptorClassName);
451       }
452       String coreParams = adaptor.parseArgs(dataType,params,this);
453       if(coreParams == null) {
454         log.warn("invalid params for adaptor: " + params);       
455         throw new AdaptorException("invalid params for adaptor: " + params);
456       }
457       
458       if(adaptorID == null) { //user didn't specify, so synthesize
459         try {
460          adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
461         } catch(NoSuchAlgorithmException e) {
462           log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
463           shutdown(true);
464         }
465       } else if(!adaptorID.startsWith("adaptor_"))
466         adaptorID = "adaptor_"+adaptorID;
467       
468       synchronized (adaptorsByName) {
469         
470         if(adaptorsByName.containsKey(adaptorID))
471           return adaptorID;
472         adaptorsByName.put(adaptorID, adaptor);
473         adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
474         needNewCheckpoint = true;
475         try {
476           adaptor.start(adaptorID, dataType, offset, DataFactory
477               .getInstance().getEventQueue());
478           log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
479           ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
480           ChukwaAgent.agentMetrics.addedAdaptor.inc();
481           return adaptorID;
482 
483         } catch (Exception e) {
484           Adaptor failed = adaptorsByName.remove(adaptorID);
485           adaptorPositions.remove(failed);
486           adaptorStatsManager.remove(failed);
487           log.warn("failed to start adaptor", e);
488           if(e instanceof AdaptorException)
489             throw (AdaptorException)e;
490         }
491       }
492     } else if (cmd.length() > 0)
493       log.warn("only 'add' command supported in config files; cmd was: " + cmd);
494     // no warning for blank line
495 
496     return null;
497   }
498 
499 
500 
501   /**
502    * Tries to restore from a checkpoint file in checkpointDir. There should
503    * usually only be one checkpoint present -- two checkpoints present implies a
504    * crash during writing the higher-numbered one. As a result, this method
505    * chooses the lowest-numbered file present.
506    * 
507    * Lines in the checkpoint file are processed one at a time with
508    * processCommand();
509    * 
510    * @return true if the restore succeeded
511    * @throws IOException
512    */
513   private boolean restoreFromCheckpoint() throws IOException {
514     synchronized (checkpointDir) {
515       String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
516         public boolean accept(File dir, String name) {
517           return name.startsWith(CHECKPOINT_BASE_NAME);
518         }
519       });
520 
521       if (checkpointNames == null) {
522         log.error("Unable to list files in checkpoint dir");
523         return false;
524       }
525       if (checkpointNames.length == 0) {
526         log.info("No checkpoints found in " + checkpointDir);
527         return false;
528       }
529 
530       if (checkpointNames.length > 2)
531         log.warn("expected at most two checkpoint files in " + checkpointDir
532             + "; saw " + checkpointNames.length);
533       else if (checkpointNames.length == 0)
534         return false;
535 
536       String lowestName = null;
537       int lowestIndex = Integer.MAX_VALUE;
538       for (String n : checkpointNames) {
539         int index = Integer
540             .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
541         if (index < lowestIndex) {
542           lowestName = n;
543           lowestIndex = index;
544         }
545       }
546 
547       checkpointNumber = lowestIndex + 1;
548       File checkpoint = new File(checkpointDir, lowestName);
549       readAdaptorsFile(checkpoint);
550     }
551     return true;
552   }
553 
554   private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
555       IOException {
556     log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
557     BufferedReader br = new BufferedReader(new InputStreamReader(
558         new FileInputStream(checkpoint)));
559     String cmd = null;
560     while ((cmd = br.readLine()) != null)
561       processAddCommand(cmd);
562     br.close();
563   }
564 
565   /**
566    * Called periodically to write checkpoints
567    * 
568    * @throws IOException
569    */
570   private void writeCheckpoint() throws IOException {
571     needNewCheckpoint = false;
572     synchronized (checkpointDir) {
573       log.info("writing checkpoint " + checkpointNumber);
574 
575       FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
576           CHECKPOINT_BASE_NAME + checkpointNumber));
577       PrintWriter out = new PrintWriter(new BufferedWriter(
578           new OutputStreamWriter(fos)));
579 
580       for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
581         out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
582       }
583 
584       out.close();
585       File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
586           + (checkpointNumber - 1));
587       log.debug("hopefully removing old checkpoint file "
588           + lastCheckpoint.getAbsolutePath());
589       lastCheckpoint.delete();
590       checkpointNumber++;
591     }
592   }
593 
594   public String reportCommit(Adaptor src, long uuid) {
595     needNewCheckpoint = true;
596     Offset o = adaptorPositions.get(src);
597     if (o != null) {
598       synchronized (o) { // order writes to offset, in case commits are
599                          // processed out of order
600         if (uuid > o.offset)
601           o.offset = uuid;
602       }
603       log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
604       if(src instanceof NotifyOnCommitAdaptor) {
605         ((NotifyOnCommitAdaptor) src).committed(uuid);
606       }
607       return o.id;
608     } else {
609       log.warn("got commit up to " + uuid + "  for adaptor " + src
610           + " that doesn't appear to be running: " + adaptorCount()
611           + " total");
612       return null;
613     }
614   }
615 
616   private class CheckpointTask extends TimerTask {
617     public void run() {
618       try {
619         if (needNewCheckpoint) {
620           writeCheckpoint();
621         }
622       } catch (IOException e) {
623         log.warn("failed to write checkpoint", e);
624       }
625     }
626   }
627 
628   
629   private String formatAdaptorStatus(Adaptor a) {
630     return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + 
631    " " + adaptorPositions.get(a).offset;
632   }
633   
634 /**
635  * Expose the adaptor list.  Keys are adaptor ID numbers, values are the 
636  * adaptor status strings.
637  * @return
638  */
639   public Map<String, String> getAdaptorList() {
640     Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
641     synchronized (adaptorsByName) {
642       for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
643         adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
644       }
645     }
646     return adaptors;
647   }
648   
649 
650   public long stopAdaptor(String name, boolean gracefully) {
651     if (gracefully) 
652       return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
653     else
654       return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
655   }
656 
657   /**
658    * Stop the adaptor with given ID number. Takes a parameter to indicate
659    * whether the adaptor should force out all remaining data, or just exit
660    * abruptly.
661    * 
662    * If the adaptor is written correctly, its offset won't change after
663    * returning from shutdown.
664    * 
665    * @param name the adaptor to stop
666    * @param shutdownMode if true, shutdown, if false, hardStop
667    * @return the number of bytes synched at stop. -1 on error
668    */
669   public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
670     Adaptor toStop;
671     long offset = -1;
672 
673     // at most one thread can get past this critical section with toStop != null
674     // so if multiple callers try to stop the same adaptor, all but one will
675     // fail
676     synchronized (adaptorsByName) {
677       toStop = adaptorsByName.remove(name);
678     }
679     if (toStop == null) {
680       log.warn("trying to stop " + name + " that isn't running");
681       return offset;
682     } else {
683       adaptorPositions.remove(toStop);
684       adaptorStatsManager.remove(toStop);
685     }
686     ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
687     ChukwaAgent.agentMetrics.removedAdaptor.inc();
688     
689     try {
690       offset = toStop.shutdown(shutdownMode);
691       log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
692           + toStop.getCurrentStatus());
693     } catch (AdaptorException e) {
694       log.error("adaptor failed to stop cleanly", e);
695     } finally {
696       needNewCheckpoint = true;
697     }
698     return offset;
699   }
700 
701   @Override
702   public Configuration getConfiguration() {
703     return conf;
704   }
705   
706   @Override
707   public Adaptor getAdaptor(String name) {
708     synchronized(adaptorsByName) {
709       return adaptorsByName.get(name);
710     }
711   }
712 
713   public Offset offset(Adaptor a) {
714     Offset o = adaptorPositions.get(a);
715     return o;
716   }
717   
718   Connector getConnector() {
719     return connector;
720   }
721 
722   private static Configuration readConfig() {
723     Configuration conf = new Configuration();
724 
725     String chukwaHomeName = System.getenv("CHUKWA_HOME");
726     if (chukwaHomeName == null) {
727       chukwaHomeName = "";
728     }
729     File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
730 
731     log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
732 
733     String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
734     File chukwaConf;
735     if (chukwaConfName != null)
736       chukwaConf = new File(chukwaConfName).getAbsoluteFile();
737     else
738       chukwaConf = new File(chukwaHome, "conf");
739 
740     log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
741     File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
742     conf.addResource(new Path(agentConf.getAbsolutePath()));
743     if (conf.get("chukwaAgent.checkpoint.dir") == null)
744       conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
745           .getAbsolutePath());
746     conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
747         "initial_adaptors").getAbsolutePath());
748     try { 
749       Configuration chukwaAgentConf = new Configuration(false);
750       chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
751       Checker.checkConf(new OptDictionary(new File(new File(chukwaHome, "share/chukwa/lib"), "agent.dict")),
752           HSlurper.fromHConf(chukwaAgentConf));
753     } catch(Exception e) {
754       e.printStackTrace();
755     }    
756     return conf;
757   }
758 
759   public void shutdown() {
760     shutdown(false);
761   }
762 
763   /**
764    * Triggers agent shutdown. For now, this method doesn't shut down adaptors
765    * explicitly. It probably should.
766    */
767   public void shutdown(boolean exit) {
768     controlSock.shutdown(); // make sure we don't get new requests
769 
770     if (statsCollector != null) {
771       statsCollector.cancel();
772     }
773 
774     try {
775       jettyServer.stop();
776     } catch (Exception e) {
777       log.error("Couldn't stop jetty server.", e);
778     }
779 
780     if (checkpointer != null) {
781       checkpointer.cancel();
782       try {
783         if (needNewCheckpoint)
784           writeCheckpoint(); // write a last checkpoint here, before stopping
785       } catch (IOException e) {
786         log.debug(ExceptionUtil.getStackTrace(e));
787       }
788     }
789     // adaptors
790 
791     synchronized (adaptorsByName) {
792       // shut down each adaptor
793       for (Adaptor a : adaptorsByName.values()) {
794         try {
795           a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
796         } catch (AdaptorException e) {
797           log.warn("failed to cleanly stop " + a, e);
798         }
799       }
800     }
801     adaptorsByName.clear();
802     adaptorPositions.clear();
803     adaptorStatsManager.clear();
804     if (exit)
805       System.exit(0);
806   }
807 
808   /**
809    * Returns the control socket for this agent.
810    */
811   private AgentControlSocketListener getControlSock() {
812     return controlSock;
813   }
814 
815   public String getAdaptorName(Adaptor initiator) {
816     Offset o = adaptorPositions.get(initiator);
817     if(o != null)
818       return o.id;
819     else return null;
820   }
821 }