View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector;
20  
21  
22  import org.mortbay.jetty.*;
23  import org.mortbay.jetty.nio.SelectChannelConnector;
24  import org.mortbay.jetty.servlet.*;
25  import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
26  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
27  import org.apache.hadoop.chukwa.datacollection.writer.*;
28  import org.apache.hadoop.chukwa.util.DaemonWatcher;
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.Path;
32  import edu.berkeley.confspell.Checker;
33  import edu.berkeley.confspell.HSlurper;
34  import edu.berkeley.confspell.OptDictionary;
35  import javax.servlet.http.HttpServlet;
36  import java.io.File;
37  import java.util.*;
38  
39  public class CollectorStub {
40  
41    static int THREADS = 120;
42    public static Server jettyServer = null;
43  
44    public static void main(String[] args) {
45  
46      DaemonWatcher.createInstance("Collector");
47      try {
48        if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
49          System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
50          System.out.println("A number of options can be specified here for debugging or special uses.  e.g.: ");
51          System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
52                      + "\n\tservlet=<classname>@path");
53          System.out.println("Command line options will override normal configuration.");
54          System.exit(0);
55        }
56  
57        ChukwaConfiguration conf = new ChukwaConfiguration();
58        
59        try {
60          Configuration collectorConf = new Configuration(false);
61          collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
62          collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
63          Checker.checkConf(new OptDictionary(new File(new File(conf.getChukwaHome(), "share/chukwa/lib"), "collector.dict")),
64              HSlurper.fromHConf(collectorConf));
65        } catch(Exception e) {e.printStackTrace();}
66        
67        int portNum = conf.getInt("chukwaCollector.http.port", 9999);
68        THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
69  
70        // pick a writer.
71        ChukwaWriter w = null;
72        Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
73        ServletCollector servletCollector = new ServletCollector(conf);
74        for(String arg: args) {
75          if(arg.startsWith("writer=")) {       //custom writer class
76            String writerCmd = arg.substring("writer=".length());
77            if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
78              boolean verbose = !writerCmd.equals("pretend-quietly");
79              w = new ConsoleWriter(verbose);
80              w.init(conf);
81              servletCollector.setWriter(w);
82            } else 
83              conf.set("chukwaCollector.writerClass", writerCmd);
84          } else if(arg.startsWith("servlet=")) {     //adding custom servlet
85             String servletCmd = arg.substring("servlet=".length()); 
86             String[] halves = servletCmd.split("@");
87             try {
88               Class<?> servletClass = Class.forName(halves[0]);
89               HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
90               if(!halves[1].startsWith("/"))
91                 halves[1] = "/" + halves[1];
92               servletsToAdd.put(halves[1], srvlet);
93             } catch(Exception e) {
94               e.printStackTrace();
95             }
96          } else if(arg.startsWith("portno=")) {
97            portNum = Integer.parseInt(arg.substring("portno=".length()));
98          } else { //unknown arg
99            System.out.println("WARNING: unknown command line arg " + arg);
100           System.out.println("Invoke collector with command line arg 'help' for usage");
101         }
102       }
103 
104       // Set up jetty connector
105       SelectChannelConnector jettyConnector = new SelectChannelConnector();
106       jettyConnector.setLowResourcesConnections(THREADS - 10);
107       jettyConnector.setLowResourceMaxIdleTime(1500);
108       jettyConnector.setPort(portNum);
109       
110       // Set up jetty server proper, using connector
111       jettyServer = new Server(portNum);
112       jettyServer.setConnectors(new Connector[] { jettyConnector });
113       org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
114       pool.setMaxThreads(THREADS);
115       jettyServer.setThreadPool(pool);
116       
117       // Add the collector servlet to server
118       Context root = new Context(jettyServer, "/", Context.SESSIONS);
119       root.addServlet(new ServletHolder(servletCollector), "/*");
120       
121       if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
122         root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
123 
124       if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
125         root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
126 
127       
128       root.setAllowNullPathInfo(false);
129 
130       // Add in any user-specified servlets
131       for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
132         root.addServlet(new ServletHolder(e.getValue()), e.getKey());
133       }
134       
135       // And finally, fire up the server
136       jettyServer.start();
137       jettyServer.setStopAtShutdown(true);
138 
139       System.out.println("started Chukwa http collector on port " + portNum);
140       System.out.close();
141       System.err.close();
142     } catch (Exception e) {
143       e.printStackTrace();
144       DaemonWatcher.bailout(-1);
145     }
146 
147   }
148 
149 }