1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector;
20
21
22 import org.mortbay.jetty.*;
23 import org.mortbay.jetty.nio.SelectChannelConnector;
24 import org.mortbay.jetty.servlet.*;
25 import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
26 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
27 import org.apache.hadoop.chukwa.datacollection.writer.*;
28 import org.apache.hadoop.chukwa.util.DaemonWatcher;
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import edu.berkeley.confspell.Checker;
33 import edu.berkeley.confspell.HSlurper;
34 import edu.berkeley.confspell.OptDictionary;
35 import javax.servlet.http.HttpServlet;
36 import java.io.File;
37 import java.util.*;
38
39 public class CollectorStub {
40
41 static int THREADS = 120;
42 public static Server jettyServer = null;
43
44 public static void main(String[] args) {
45
46 DaemonWatcher.createInstance("Collector");
47 try {
48 if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
49 System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
50 System.out.println("A number of options can be specified here for debugging or special uses. e.g.: ");
51 System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
52 + "\n\tservlet=<classname>@path");
53 System.out.println("Command line options will override normal configuration.");
54 System.exit(0);
55 }
56
57 ChukwaConfiguration conf = new ChukwaConfiguration();
58
59 try {
60 Configuration collectorConf = new Configuration(false);
61 collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
62 collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
63 Checker.checkConf(new OptDictionary(new File(new File(conf.getChukwaHome(), "share/chukwa/lib"), "collector.dict")),
64 HSlurper.fromHConf(collectorConf));
65 } catch(Exception e) {e.printStackTrace();}
66
67 int portNum = conf.getInt("chukwaCollector.http.port", 9999);
68 THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
69
70
71 ChukwaWriter w = null;
72 Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
73 ServletCollector servletCollector = new ServletCollector(conf);
74 for(String arg: args) {
75 if(arg.startsWith("writer=")) {
76 String writerCmd = arg.substring("writer=".length());
77 if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
78 boolean verbose = !writerCmd.equals("pretend-quietly");
79 w = new ConsoleWriter(verbose);
80 w.init(conf);
81 servletCollector.setWriter(w);
82 } else
83 conf.set("chukwaCollector.writerClass", writerCmd);
84 } else if(arg.startsWith("servlet=")) {
85 String servletCmd = arg.substring("servlet=".length());
86 String[] halves = servletCmd.split("@");
87 try {
88 Class<?> servletClass = Class.forName(halves[0]);
89 HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
90 if(!halves[1].startsWith("/"))
91 halves[1] = "/" + halves[1];
92 servletsToAdd.put(halves[1], srvlet);
93 } catch(Exception e) {
94 e.printStackTrace();
95 }
96 } else if(arg.startsWith("portno=")) {
97 portNum = Integer.parseInt(arg.substring("portno=".length()));
98 } else {
99 System.out.println("WARNING: unknown command line arg " + arg);
100 System.out.println("Invoke collector with command line arg 'help' for usage");
101 }
102 }
103
104
105 SelectChannelConnector jettyConnector = new SelectChannelConnector();
106 jettyConnector.setLowResourcesConnections(THREADS - 10);
107 jettyConnector.setLowResourceMaxIdleTime(1500);
108 jettyConnector.setPort(portNum);
109
110
111 jettyServer = new Server(portNum);
112 jettyServer.setConnectors(new Connector[] { jettyConnector });
113 org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
114 pool.setMaxThreads(THREADS);
115 jettyServer.setThreadPool(pool);
116
117
118 Context root = new Context(jettyServer, "/", Context.SESSIONS);
119 root.addServlet(new ServletHolder(servletCollector), "/*");
120
121 if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
122 root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
123
124 if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
125 root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
126
127
128 root.setAllowNullPathInfo(false);
129
130
131 for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
132 root.addServlet(new ServletHolder(e.getValue()), e.getKey());
133 }
134
135
136 jettyServer.start();
137 jettyServer.setStopAtShutdown(true);
138
139 System.out.println("started Chukwa http collector on port " + portNum);
140 System.out.close();
141 System.err.close();
142 } catch (Exception e) {
143 e.printStackTrace();
144 DaemonWatcher.bailout(-1);
145 }
146
147 }
148
149 }