1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20
21
22 import java.io.DataInputStream;
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.util.*;
26 import javax.servlet.ServletConfig;
27 import javax.servlet.ServletException;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.http.HttpServlet;
30 import javax.servlet.http.HttpServletRequest;
31 import javax.servlet.http.HttpServletResponse;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.chukwa.Chunk;
34 import org.apache.hadoop.chukwa.ChunkImpl;
35 import org.apache.hadoop.chukwa.datacollection.writer.*;
36 import org.apache.hadoop.chukwa.util.DaemonWatcher;
37 import org.apache.log4j.Logger;
38
39 public class ServletCollector extends HttpServlet {
40
41 static final boolean FANCY_DIAGNOSTICS = false;
42 public static final String PATH = "chukwa";
43
44
45
46 public static final String ACK_PREFIX = "ok: ";
47 org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
48
49 private static final long serialVersionUID = 6286162898591407111L;
50 Logger log = Logger.getRootLogger();
51
52 public void setWriter(ChukwaWriter w) {
53 writer = w;
54 }
55
56 public ChukwaWriter getWriter() {
57 return writer;
58 }
59
60 long statTime = 0L;
61 int numberHTTPConnection = 0;
62 int numberchunks = 0;
63 long lifetimechunks = 0;
64
65 Configuration conf;
66
67 public ServletCollector(Configuration c) {
68 conf = c;
69 }
70
71 public void init(ServletConfig servletConf) throws ServletException {
72
73 log.info("initing servletCollector");
74 if (servletConf == null) {
75 log.fatal("no servlet config");
76 return;
77 }
78
79 Timer statTimer = new Timer();
80 statTimer.schedule(new TimerTask() {
81 public void run() {
82 log.info("stats:ServletCollector,numberHTTPConnection:"
83 + numberHTTPConnection + ",numberchunks:" + numberchunks);
84 statTime = System.currentTimeMillis();
85 numberHTTPConnection = 0;
86 numberchunks = 0;
87 }
88 }, (1000), (60 * 1000));
89
90 if (writer != null) {
91 log.info("writer set up statically, no need for Collector.init() to do it");
92 return;
93 }
94
95 try {
96 String writerClassName = conf.get("chukwaCollector.writerClass",
97 SeqFileWriter.class.getCanonicalName());
98 Class<?> writerClass = Class.forName(writerClassName);
99 if (writerClass != null
100 && ChukwaWriter.class.isAssignableFrom(writerClass))
101 writer = (ChukwaWriter) writerClass.newInstance();
102 } catch (Exception e) {
103 log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
104 }
105
106
107 try {
108 if (writer == null) {
109 writer = new SeqFileWriter();
110 }
111
112 writer.init(conf);
113 } catch (Throwable e) {
114 log.warn("Exception trying to initialize SeqFileWriter",e);
115 DaemonWatcher.bailout(-1);
116 }
117 }
118
119 @Override
120 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
121 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
122 }
123
124 protected void accept(HttpServletRequest req, HttpServletResponse resp)
125 throws ServletException {
126 numberHTTPConnection++;
127 ServletDiagnostics diagnosticPage = new ServletDiagnostics();
128 final long currentTime = System.currentTimeMillis();
129 try {
130
131 log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
132 java.io.InputStream in = req.getInputStream();
133
134 ServletOutputStream l_out = resp.getOutputStream();
135 final DataInputStream di = new DataInputStream(in);
136 final int numEvents = di.readInt();
137
138
139 if (FANCY_DIAGNOSTICS) {
140 diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
141 }
142
143 List<Chunk> events = new LinkedList<Chunk>();
144 StringBuilder sb = new StringBuilder();
145
146 for (int i = 0; i < numEvents; i++) {
147 ChunkImpl logEvent = ChunkImpl.read(di);
148 events.add(logEvent);
149
150 if (FANCY_DIAGNOSTICS) {
151 diagnosticPage.sawChunk(logEvent, i);
152 }
153 }
154
155 int responseStatus = HttpServletResponse.SC_OK;
156
157
158 if (writer != null) {
159 ChukwaWriter.CommitStatus result = writer.add(events);
160
161
162
163 if(result == ChukwaWriter.COMMIT_OK) {
164
165 numberchunks += events.size();
166 lifetimechunks += events.size();
167
168 for(Chunk receivedChunk: events) {
169 sb.append(ACK_PREFIX);
170 sb.append(receivedChunk.getData().length);
171 sb.append(" bytes ending at offset ");
172 sb.append(receivedChunk.getSeqID() - 1).append("\n");
173 }
174 } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
175
176
177 numberchunks += events.size();
178 lifetimechunks += events.size();
179
180 for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
181 sb.append(s);
182 } else if(result == ChukwaWriter.COMMIT_FAIL) {
183 sb.append("Commit failed");
184 responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
185 }
186
187 l_out.print(sb.toString());
188 } else {
189 l_out.println("can't write: no writer");
190 }
191
192 if (FANCY_DIAGNOSTICS) {
193 diagnosticPage.doneWithPost();
194 }
195
196 resp.setStatus(responseStatus);
197
198 } catch (Throwable e) {
199 log.warn("Exception talking to " + req.getRemoteHost() + " at t="
200 + currentTime, e);
201 throw new ServletException(e);
202 }
203 }
204
205 @Override
206 protected void doPost(HttpServletRequest req, HttpServletResponse resp)
207 throws ServletException, IOException {
208 accept(req, resp);
209 }
210
211 @Override
212 protected void doGet(HttpServletRequest req, HttpServletResponse resp)
213 throws ServletException, IOException {
214
215
216 log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
217 PrintStream out = new PrintStream(resp.getOutputStream());
218 resp.setStatus(200);
219
220 String pingAtt = req.getParameter("ping");
221 if (pingAtt != null) {
222 out.println("Date:" + statTime);
223 out.println("Now:" + System.currentTimeMillis());
224 out.println("numberHTTPConnection in time window:"
225 + numberHTTPConnection);
226 out.println("numberchunks in time window:" + numberchunks);
227 out.println("lifetimechunks:" + lifetimechunks);
228 } else {
229 out.println("<html><body><h2>Chukwa servlet running</h2>");
230 if (FANCY_DIAGNOSTICS)
231 ServletDiagnostics.printPage(out);
232 out.println("</body></html>");
233 }
234
235 }
236
237 @Override
238 public String getServletInfo() {
239 return "Chukwa Servlet Collector";
240 }
241
242 @Override
243 public void destroy() {
244 try {
245 writer.close();
246 } catch (WriterException e) {
247 log.warn("Exception during close", e);
248 e.printStackTrace();
249 }
250 super.destroy();
251 }
252 }