1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedReader;
23 import java.io.BufferedWriter;
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.FilenameFilter;
29 import java.io.IOException;
30 import java.io.InputStreamReader;
31 import java.io.OutputStreamWriter;
32 import java.io.PrintWriter;
33 import java.security.NoSuchAlgorithmException;
34 import java.util.*;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.regex.Matcher;
37 import java.util.regex.Pattern;
38
39 import org.apache.hadoop.chukwa.datacollection.DataFactory;
40 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
41 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
42 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
43 import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
44 import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
45 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
46 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
47 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
48 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
49 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
50 import org.apache.hadoop.chukwa.util.DaemonWatcher;
51 import org.apache.hadoop.chukwa.util.ExceptionUtil;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.fs.Path;
54 import org.apache.log4j.Logger;
55
56 import org.mortbay.jetty.Server;
57 import org.mortbay.jetty.servlet.Context;
58 import org.mortbay.jetty.servlet.ServletHolder;
59 import org.mortbay.jetty.nio.SelectChannelConnector;
60 import org.mortbay.thread.BoundedThreadPool;
61 import com.sun.jersey.spi.container.servlet.ServletContainer;
62 import edu.berkeley.confspell.*;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class ChukwaAgent implements AdaptorManager {
79
80 static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
81
82 private static final int HTTP_SERVER_THREADS = 120;
83 private static Server jettyServer = null;
84 private OffsetStatsManager adaptorStatsManager = null;
85 private Timer statsCollector = null;
86
87 static Logger log = Logger.getLogger(ChukwaAgent.class);
88 static ChukwaAgent agent = null;
89
90 public static ChukwaAgent getAgent() {
91 return agent;
92 }
93
94 static Configuration conf = null;
95 Connector connector = null;
96
97
98 public static class Offset {
99 public Offset(long l, String id) {
100 offset = l;
101 this.id = id;
102 }
103
104 final String id;
105 volatile long offset;
106 public long offset() {
107 return this.offset;
108 }
109
110 public String adaptorID() {
111 return id;
112 }
113 }
114
115 public static class AlreadyRunningException extends Exception {
116
117 private static final long serialVersionUID = 1L;
118
119 public AlreadyRunningException() {
120 super("Agent already running; aborting");
121 }
122 }
123
124 private final Map<Adaptor, Offset> adaptorPositions;
125
126
127
128 private final Map<String, Adaptor> adaptorsByName;
129
130 private File checkpointDir;
131
132 private String CHECKPOINT_BASE_NAME;
133
134 private static String tags = "";
135
136 private Timer checkpointer;
137 private volatile boolean needNewCheckpoint = false;
138
139
140 private int checkpointNumber;
141
142
143 private final AgentControlSocketListener controlSock;
144
145 public int getControllerPort() {
146 return controlSock.getPort();
147 }
148
149 public OffsetStatsManager getAdaptorStatsManager() {
150 return adaptorStatsManager;
151 }
152
153
154
155
156
157 public static void main(String[] args) throws AdaptorException {
158
159 DaemonWatcher.createInstance("Agent");
160
161 try {
162 if (args.length > 0 && args[0].equals("-help")) {
163 System.out.println("usage: LocalAgent [-noCheckPoint]"
164 + "[default collector URL]");
165 System.exit(0);
166 }
167
168 conf = readConfig();
169 agent = new ChukwaAgent(conf);
170
171 if (agent.anotherAgentIsRunning()) {
172 System.out
173 .println("another agent is running (or port has been usurped). "
174 + "Bailing out now");
175 throw new AlreadyRunningException();
176 }
177
178 int uriArgNumber = 0;
179 if (args.length > 0) {
180 if (args[uriArgNumber].equals("local"))
181 agent.connector = new ConsoleOutConnector(agent);
182 else {
183 if (!args[uriArgNumber].contains("://"))
184 args[uriArgNumber] = "http://" + args[uriArgNumber];
185 agent.connector = new HttpConnector(agent, args[uriArgNumber]);
186 }
187 } else
188 agent.connector = new HttpConnector(agent);
189
190 agent.connector.start();
191
192 log.info("local agent started on port " + agent.getControlSock().portno);
193 System.out.close();
194 System.err.close();
195 } catch (AlreadyRunningException e) {
196 log.error("agent started already on this machine with same portno;"
197 + " bailing out");
198 System.out
199 .println("agent started already on this machine with same portno;"
200 + " bailing out");
201 DaemonWatcher.bailout(-1);
202 System.exit(0);
203 } catch (Exception e) {
204 e.printStackTrace();
205 }
206 }
207
208 private boolean anotherAgentIsRunning() {
209 return !controlSock.isBound();
210 }
211
212
213
214
215 @Override
216 public int adaptorCount() {
217 synchronized(adaptorsByName) {
218 return adaptorsByName.size();
219 }
220 }
221
222 public ChukwaAgent() throws AlreadyRunningException {
223 this(new Configuration());
224 }
225
226 public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
227 ChukwaAgent.agent = this;
228 this.conf = conf;
229
230
231
232 adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
233 adaptorsByName = new HashMap<String, Adaptor>();
234 checkpointNumber = 0;
235
236 boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
237 "chukwaAgent.checkpoint.enabled", true);
238 CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
239 "chukwa_checkpoint_");
240 final int CHECKPOINT_INTERVAL_MS = conf.getInt(
241 "chukwaAgent.checkpoint.interval", 5000);
242 final int STATS_INTERVAL_MS = conf.getInt(
243 "chukwaAgent.stats.collection.interval", 10000);
244 final int STATS_DATA_TTL_MS = conf.getInt(
245 "chukwaAgent.stats.data.ttl", 1200000);
246
247 if (conf.get("chukwaAgent.checkpoint.dir") != null)
248 checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
249 else
250 DO_CHECKPOINT_RESTORE = false;
251
252 if (checkpointDir != null && !checkpointDir.exists()) {
253 checkpointDir.mkdirs();
254 }
255 tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
256 DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
257
258 log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
259 log.info("Config - checkpointDir: [" + checkpointDir + "]");
260 log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
261 + "]");
262 log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
263 log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
264 log.info("Config - tags: [" + tags + "]");
265
266 if (DO_CHECKPOINT_RESTORE) {
267 log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
268 }
269
270 File initialAdaptors = null;
271 if (conf.get("chukwaAgent.initial_adaptors") != null)
272 initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
273
274 try {
275 if (DO_CHECKPOINT_RESTORE) {
276 restoreFromCheckpoint();
277 }
278 } catch (IOException e) {
279 log.warn("failed to restart from checkpoint: ", e);
280 }
281
282 try {
283 if (initialAdaptors != null && initialAdaptors.exists())
284 readAdaptorsFile(initialAdaptors);
285 } catch (IOException e) {
286 log.warn("couldn't read user-specified file "
287 + initialAdaptors.getAbsolutePath());
288 }
289
290 controlSock = new AgentControlSocketListener(this);
291 try {
292 controlSock.tryToBind();
293
294 controlSock.start();
295 log.info("control socket started on port " + controlSock.portno);
296
297
298 try {
299 this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
300 this.statsCollector = new Timer("ChukwaAgent Stats Collector");
301
302 startHttpServer(conf);
303
304 statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
305 STATS_INTERVAL_MS, STATS_INTERVAL_MS);
306 } catch (Exception e) {
307 log.error("Couldn't start HTTP server", e);
308 throw new RuntimeException(e);
309 }
310
311
312
313 if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
314 checkpointer = new Timer();
315 checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
316 }
317 } catch (IOException e) {
318 log.info("failed to bind to socket; aborting agent launch", e);
319 throw new AlreadyRunningException();
320 }
321
322 }
323
324 private void startHttpServer(Configuration conf) throws Exception {
325 int portNum = conf.getInt("chukwaAgent.http.port", 9090);
326 String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
327 StringBuilder jaxRsPackages = new StringBuilder(
328 "org.apache.hadoop.chukwa.datacollection.agent.rest");
329
330
331 if (jaxRsAddlPackages != null)
332 jaxRsPackages.append(';').append(jaxRsAddlPackages);
333
334
335 SelectChannelConnector jettyConnector = new SelectChannelConnector();
336 jettyConnector.setLowResourcesConnections(HTTP_SERVER_THREADS - 10);
337 jettyConnector.setLowResourceMaxIdleTime(1500);
338 jettyConnector.setPort(portNum);
339
340
341 jettyServer = new Server(portNum);
342 jettyServer.setConnectors(new org.mortbay.jetty.Connector[] { jettyConnector });
343 BoundedThreadPool pool = new BoundedThreadPool();
344 pool.setMaxThreads(HTTP_SERVER_THREADS);
345 jettyServer.setThreadPool(pool);
346
347
348 ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
349 servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
350 "com.sun.jersey.api.core.PackagesResourceConfig");
351 servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
352 jaxRsPackages.toString());
353
354
355 Context root = new Context(jettyServer, "/rest/v1", Context.SESSIONS);
356 root.setAttribute("ChukwaAgent", this);
357 root.addServlet(servletHolder, "/*");
358 root.setAllowNullPathInfo(false);
359
360
361 jettyServer.start();
362 jettyServer.setStopAtShutdown(true);
363
364 log.info("started Chukwa http agent interface on port " + portNum);
365 }
366
367
368
369
370 private class StatsCollectorTask extends TimerTask {
371
372 public void run() {
373 long now = System.currentTimeMillis();
374
375 for(String adaptorId : getAdaptorList().keySet()) {
376 Adaptor adaptor = getAdaptor(adaptorId);
377 if(adaptor == null) continue;
378
379 Offset offset = adaptorPositions.get(adaptor);
380 if(offset == null) continue;
381
382 adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
383 }
384 }
385 }
386
387
388
389
390
391
392
393
394
395
396
397 private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+"
398
399
400
401 + "(?:"
402 + "([^\\s=]+)"
403 + "\\s*=\\s*"
404 + ")?"
405 + "([^\\s=]+)\\s+"
406 + "(\\S+)\\s+"
407 + "(?:"
408 + "(.*?)\\s+"
409
410 + ")?"
411 + "(\\d+)\\s*");
412
413
414
415
416
417
418
419
420 public String processAddCommand(String cmd) {
421 try {
422 return processAddCommandE(cmd);
423 } catch(AdaptorException e) {
424 return null;
425 }
426 }
427
428
429 public String processAddCommandE(String cmd) throws AdaptorException {
430 Matcher m = addCmdPattern.matcher(cmd);
431 if (m.matches()) {
432 long offset;
433 try {
434 offset = Long.parseLong(m.group(5));
435 } catch (NumberFormatException e) {
436 log.warn("malformed line " + cmd);
437 throw new AdaptorException("bad input syntax");
438 }
439
440 String adaptorID = m.group(1);
441 String adaptorClassName = m.group(2);
442 String dataType = m.group(3);
443 String params = m.group(4);
444 if (params == null)
445 params = "";
446
447 Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
448 if (adaptor == null) {
449 log.warn("Error creating adaptor of class " + adaptorClassName);
450 throw new AdaptorException("Can't load class " + adaptorClassName);
451 }
452 String coreParams = adaptor.parseArgs(dataType,params,this);
453 if(coreParams == null) {
454 log.warn("invalid params for adaptor: " + params);
455 throw new AdaptorException("invalid params for adaptor: " + params);
456 }
457
458 if(adaptorID == null) {
459 try {
460 adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
461 } catch(NoSuchAlgorithmException e) {
462 log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
463 shutdown(true);
464 }
465 } else if(!adaptorID.startsWith("adaptor_"))
466 adaptorID = "adaptor_"+adaptorID;
467
468 synchronized (adaptorsByName) {
469
470 if(adaptorsByName.containsKey(adaptorID))
471 return adaptorID;
472 adaptorsByName.put(adaptorID, adaptor);
473 adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
474 needNewCheckpoint = true;
475 try {
476 adaptor.start(adaptorID, dataType, offset, DataFactory
477 .getInstance().getEventQueue());
478 log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
479 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
480 ChukwaAgent.agentMetrics.addedAdaptor.inc();
481 return adaptorID;
482
483 } catch (Exception e) {
484 Adaptor failed = adaptorsByName.remove(adaptorID);
485 adaptorPositions.remove(failed);
486 adaptorStatsManager.remove(failed);
487 log.warn("failed to start adaptor", e);
488 if(e instanceof AdaptorException)
489 throw (AdaptorException)e;
490 }
491 }
492 } else if (cmd.length() > 0)
493 log.warn("only 'add' command supported in config files; cmd was: " + cmd);
494
495
496 return null;
497 }
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513 private boolean restoreFromCheckpoint() throws IOException {
514 synchronized (checkpointDir) {
515 String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
516 public boolean accept(File dir, String name) {
517 return name.startsWith(CHECKPOINT_BASE_NAME);
518 }
519 });
520
521 if (checkpointNames == null) {
522 log.error("Unable to list files in checkpoint dir");
523 return false;
524 }
525 if (checkpointNames.length == 0) {
526 log.info("No checkpoints found in " + checkpointDir);
527 return false;
528 }
529
530 if (checkpointNames.length > 2)
531 log.warn("expected at most two checkpoint files in " + checkpointDir
532 + "; saw " + checkpointNames.length);
533 else if (checkpointNames.length == 0)
534 return false;
535
536 String lowestName = null;
537 int lowestIndex = Integer.MAX_VALUE;
538 for (String n : checkpointNames) {
539 int index = Integer
540 .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
541 if (index < lowestIndex) {
542 lowestName = n;
543 lowestIndex = index;
544 }
545 }
546
547 checkpointNumber = lowestIndex + 1;
548 File checkpoint = new File(checkpointDir, lowestName);
549 readAdaptorsFile(checkpoint);
550 }
551 return true;
552 }
553
554 private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
555 IOException {
556 log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
557 BufferedReader br = new BufferedReader(new InputStreamReader(
558 new FileInputStream(checkpoint)));
559 String cmd = null;
560 while ((cmd = br.readLine()) != null)
561 processAddCommand(cmd);
562 br.close();
563 }
564
565
566
567
568
569
570 private void writeCheckpoint() throws IOException {
571 needNewCheckpoint = false;
572 synchronized (checkpointDir) {
573 log.info("writing checkpoint " + checkpointNumber);
574
575 FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
576 CHECKPOINT_BASE_NAME + checkpointNumber));
577 PrintWriter out = new PrintWriter(new BufferedWriter(
578 new OutputStreamWriter(fos)));
579
580 for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
581 out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
582 }
583
584 out.close();
585 File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
586 + (checkpointNumber - 1));
587 log.debug("hopefully removing old checkpoint file "
588 + lastCheckpoint.getAbsolutePath());
589 lastCheckpoint.delete();
590 checkpointNumber++;
591 }
592 }
593
594 public String reportCommit(Adaptor src, long uuid) {
595 needNewCheckpoint = true;
596 Offset o = adaptorPositions.get(src);
597 if (o != null) {
598 synchronized (o) {
599
600 if (uuid > o.offset)
601 o.offset = uuid;
602 }
603 log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
604 if(src instanceof NotifyOnCommitAdaptor) {
605 ((NotifyOnCommitAdaptor) src).committed(uuid);
606 }
607 return o.id;
608 } else {
609 log.warn("got commit up to " + uuid + " for adaptor " + src
610 + " that doesn't appear to be running: " + adaptorCount()
611 + " total");
612 return null;
613 }
614 }
615
616 private class CheckpointTask extends TimerTask {
617 public void run() {
618 try {
619 if (needNewCheckpoint) {
620 writeCheckpoint();
621 }
622 } catch (IOException e) {
623 log.warn("failed to write checkpoint", e);
624 }
625 }
626 }
627
628
629 private String formatAdaptorStatus(Adaptor a) {
630 return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() +
631 " " + adaptorPositions.get(a).offset;
632 }
633
634
635
636
637
638
639 public Map<String, String> getAdaptorList() {
640 Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
641 synchronized (adaptorsByName) {
642 for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
643 adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
644 }
645 }
646 return adaptors;
647 }
648
649
650 public long stopAdaptor(String name, boolean gracefully) {
651 if (gracefully)
652 return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
653 else
654 return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668
669 public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
670 Adaptor toStop;
671 long offset = -1;
672
673
674
675
676 synchronized (adaptorsByName) {
677 toStop = adaptorsByName.remove(name);
678 }
679 if (toStop == null) {
680 log.warn("trying to stop " + name + " that isn't running");
681 return offset;
682 } else {
683 adaptorPositions.remove(toStop);
684 adaptorStatsManager.remove(toStop);
685 }
686 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
687 ChukwaAgent.agentMetrics.removedAdaptor.inc();
688
689 try {
690 offset = toStop.shutdown(shutdownMode);
691 log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
692 + toStop.getCurrentStatus());
693 } catch (AdaptorException e) {
694 log.error("adaptor failed to stop cleanly", e);
695 } finally {
696 needNewCheckpoint = true;
697 }
698 return offset;
699 }
700
701 @Override
702 public Configuration getConfiguration() {
703 return conf;
704 }
705
706 @Override
707 public Adaptor getAdaptor(String name) {
708 synchronized(adaptorsByName) {
709 return adaptorsByName.get(name);
710 }
711 }
712
713 public Offset offset(Adaptor a) {
714 Offset o = adaptorPositions.get(a);
715 return o;
716 }
717
718 Connector getConnector() {
719 return connector;
720 }
721
722 private static Configuration readConfig() {
723 Configuration conf = new Configuration();
724
725 String chukwaHomeName = System.getenv("CHUKWA_HOME");
726 if (chukwaHomeName == null) {
727 chukwaHomeName = "";
728 }
729 File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
730
731 log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
732
733 String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
734 File chukwaConf;
735 if (chukwaConfName != null)
736 chukwaConf = new File(chukwaConfName).getAbsoluteFile();
737 else
738 chukwaConf = new File(chukwaHome, "conf");
739
740 log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
741 File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
742 conf.addResource(new Path(agentConf.getAbsolutePath()));
743 if (conf.get("chukwaAgent.checkpoint.dir") == null)
744 conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
745 .getAbsolutePath());
746 conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
747 "initial_adaptors").getAbsolutePath());
748 try {
749 Configuration chukwaAgentConf = new Configuration(false);
750 chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
751 Checker.checkConf(new OptDictionary(new File(new File(chukwaHome, "share/chukwa/lib"), "agent.dict")),
752 HSlurper.fromHConf(chukwaAgentConf));
753 } catch(Exception e) {
754 e.printStackTrace();
755 }
756 return conf;
757 }
758
759 public void shutdown() {
760 shutdown(false);
761 }
762
763
764
765
766
767 public void shutdown(boolean exit) {
768 controlSock.shutdown();
769
770 if (statsCollector != null) {
771 statsCollector.cancel();
772 }
773
774 try {
775 jettyServer.stop();
776 } catch (Exception e) {
777 log.error("Couldn't stop jetty server.", e);
778 }
779
780 if (checkpointer != null) {
781 checkpointer.cancel();
782 try {
783 if (needNewCheckpoint)
784 writeCheckpoint();
785 } catch (IOException e) {
786 log.debug(ExceptionUtil.getStackTrace(e));
787 }
788 }
789
790
791 synchronized (adaptorsByName) {
792
793 for (Adaptor a : adaptorsByName.values()) {
794 try {
795 a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
796 } catch (AdaptorException e) {
797 log.warn("failed to cleanly stop " + a, e);
798 }
799 }
800 }
801 adaptorsByName.clear();
802 adaptorPositions.clear();
803 adaptorStatsManager.clear();
804 if (exit)
805 System.exit(0);
806 }
807
808
809
810
811 private AgentControlSocketListener getControlSock() {
812 return controlSock;
813 }
814
815 public String getAdaptorName(Adaptor initiator) {
816 Offset o = adaptorPositions.get(initiator);
817 if(o != null)
818 return o.id;
819 else return null;
820 }
821 }