View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20  
21  
22  import java.io.DataInputStream;
23  import java.io.IOException;
24  import java.io.PrintStream;
25  import java.util.*;
26  import javax.servlet.ServletConfig;
27  import javax.servlet.ServletException;
28  import javax.servlet.ServletOutputStream;
29  import javax.servlet.http.HttpServlet;
30  import javax.servlet.http.HttpServletRequest;
31  import javax.servlet.http.HttpServletResponse;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.chukwa.Chunk;
34  import org.apache.hadoop.chukwa.ChunkImpl;
35  import org.apache.hadoop.chukwa.datacollection.writer.*;
36  import org.apache.hadoop.chukwa.util.DaemonWatcher;
37  import org.apache.log4j.Logger;
38  
39  public class ServletCollector extends HttpServlet {
40  
41    static final boolean FANCY_DIAGNOSTICS = false;
42    public static final String PATH = "chukwa";
43    /**
44     * If a chunk is committed; then the ack will start with the following string.
45     */
46    public static final String ACK_PREFIX = "ok: ";
47    org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
48  
49    private static final long serialVersionUID = 6286162898591407111L;
50    Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
51  
52    public void setWriter(ChukwaWriter w) {
53      writer = w;
54    }
55    
56    public ChukwaWriter getWriter() {
57      return writer;
58    }
59  
60    long statTime = 0L;
61    int numberHTTPConnection = 0;
62    int numberchunks = 0;
63    long lifetimechunks = 0;
64  
65    Configuration conf;
66  
67    public ServletCollector(Configuration c) {
68      conf = c;
69    }
70  
71    public void init(ServletConfig servletConf) throws ServletException {
72  
73      log.info("initing servletCollector");
74      if (servletConf == null) {
75        log.fatal("no servlet config");
76        return;
77      }
78  
79      Timer statTimer = new Timer();
80      statTimer.schedule(new TimerTask() {
81        public void run() {
82          log.info("stats:ServletCollector,numberHTTPConnection:"
83              + numberHTTPConnection + ",numberchunks:" + numberchunks);
84          statTime = System.currentTimeMillis();
85          numberHTTPConnection = 0;
86          numberchunks = 0;
87        }
88      }, (1000), (60 * 1000));
89  
90      if (writer != null) {
91        log.info("writer set up statically, no need for Collector.init() to do it");
92        return;
93      }
94  
95      try {
96        String writerClassName = conf.get("chukwaCollector.writerClass",
97            SeqFileWriter.class.getCanonicalName());
98        Class<?> writerClass = Class.forName(writerClassName);
99        if (writerClass != null
100           && ChukwaWriter.class.isAssignableFrom(writerClass))
101         writer = (ChukwaWriter) writerClass.newInstance();
102     } catch (Exception e) {
103       log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
104     }
105 
106     // We default to here if the pipeline construction failed or didn't happen.
107     try {
108       if (writer == null) {
109         writer =  new SeqFileWriter();
110       }
111       
112       writer.init(conf);
113     } catch (Throwable e) {
114       log.warn("Exception trying to initialize SeqFileWriter",e);
115       DaemonWatcher.bailout(-1);
116     }
117   }
118 
119   @Override
120   protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
121     resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
122   }
123 
124   protected void accept(HttpServletRequest req, HttpServletResponse resp)
125       throws ServletException {
126     numberHTTPConnection++;
127     ServletDiagnostics diagnosticPage = new ServletDiagnostics();
128     final long currentTime = System.currentTimeMillis();
129     try {
130 
131       log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
132       java.io.InputStream in = req.getInputStream();
133 
134       ServletOutputStream l_out = resp.getOutputStream();
135       final DataInputStream di = new DataInputStream(in);
136       final int numEvents = di.readInt();
137       // log.info("saw " + numEvents+ " in request");
138 
139       if (FANCY_DIAGNOSTICS) {
140         diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
141       }
142 
143       List<Chunk> events = new LinkedList<Chunk>();
144       StringBuilder sb = new StringBuilder();
145 
146       for (int i = 0; i < numEvents; i++) {
147         ChunkImpl logEvent = ChunkImpl.read(di);
148         events.add(logEvent);
149 
150         if (FANCY_DIAGNOSTICS) {
151           diagnosticPage.sawChunk(logEvent, i);
152         }
153       }
154 
155       int responseStatus = HttpServletResponse.SC_OK;
156 
157       // write new data to data sync file
158       if (writer != null) {
159         ChukwaWriter.CommitStatus result = writer.add(events);
160 
161         // this is where we ACK this connection
162 
163         if(result == ChukwaWriter.COMMIT_OK) {
164           // only count the chunks if result is commit or commit pending
165           numberchunks += events.size();
166           lifetimechunks += events.size();
167 
168           for(Chunk receivedChunk: events) {
169             sb.append(ACK_PREFIX);
170             sb.append(receivedChunk.getData().length);
171             sb.append(" bytes ending at offset ");
172             sb.append(receivedChunk.getSeqID() - 1).append("\n");
173           }
174         } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
175 
176           // only count the chunks if result is commit or commit pending
177           numberchunks += events.size();
178           lifetimechunks += events.size();
179 
180           for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
181             sb.append(s);
182         } else if(result == ChukwaWriter.COMMIT_FAIL) {
183           sb.append("Commit failed");
184           responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
185         }
186 
187         l_out.print(sb.toString());
188       } else {
189         l_out.println("can't write: no writer");
190       }
191 
192       if (FANCY_DIAGNOSTICS) {
193         diagnosticPage.doneWithPost();
194       }
195 
196       resp.setStatus(responseStatus);
197 
198     } catch (Throwable e) {
199       log.warn("Exception talking to " + req.getRemoteHost() + " at t="
200           + currentTime, e);
201       throw new ServletException(e);
202     }
203   }
204 
205   @Override
206   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
207       throws ServletException, IOException {
208     accept(req, resp);
209   }
210 
211   @Override
212   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
213       throws ServletException, IOException {
214 
215 
216     log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
217     PrintStream out = new PrintStream(resp.getOutputStream());
218     resp.setStatus(200);
219 
220     String pingAtt = req.getParameter("ping");
221     if (pingAtt != null) {
222       out.println("Date:" + statTime);
223       out.println("Now:" + System.currentTimeMillis());
224       out.println("numberHTTPConnection in time window:"
225           + numberHTTPConnection);
226       out.println("numberchunks in time window:" + numberchunks);
227       out.println("lifetimechunks:" + lifetimechunks);
228     } else {
229       out.println("<html><body><h2>Chukwa servlet running</h2>");
230       if (FANCY_DIAGNOSTICS)
231         ServletDiagnostics.printPage(out);
232       out.println("</body></html>");
233     }
234 
235   }
236 
237   @Override
238   public String getServletInfo() {
239     return "Chukwa Servlet Collector";
240   }
241 
242   @Override
243   public void destroy() {
244     try {
245       writer.close();
246     } catch (WriterException e) {
247       log.warn("Exception during close", e);
248       e.printStackTrace();
249     }
250     super.destroy();
251   }
252 }