1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19
20 import javax.servlet.ServletConfig;
21 import javax.servlet.ServletException;
22 import javax.servlet.http.HttpServlet;
23 import javax.servlet.http.HttpServletRequest;
24 import javax.servlet.http.HttpServletResponse;
25 import org.apache.log4j.Logger;
26 import java.io.*;
27 import java.security.MessageDigest;
28 import java.security.NoSuchAlgorithmException;
29 import java.util.*;
30 import org.apache.hadoop.chukwa.*;
31 import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
32 import org.apache.hadoop.conf.Configuration;
33
34 public class LogDisplayServlet extends HttpServlet {
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public static final String DEFAULT_PATH = "logs";
59 public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
60 public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
61 long BUF_SIZE = 1024* 1024;
62
63 Configuration conf;
64 Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
65 Queue<String> receivedSIDs = new LinkedList<String>();
66 long totalStoredSize = 0;
67
68 private static final long serialVersionUID = -4602082382919009285L;
69 protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
70
71 public LogDisplayServlet() {
72 conf = new Configuration();
73 ExtractorWriter.recipient = this;
74 }
75
76 public LogDisplayServlet(Configuration c) {
77 conf = c;
78 ExtractorWriter.recipient = this;
79 }
80
81 public void init(ServletConfig servletConf) throws ServletException {
82 BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
83 }
84
85 @Override
86 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
87 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
88 }
89
90 private String getSID(Chunk c) {
91 try {
92 MessageDigest md;
93 md = MessageDigest.getInstance("MD5");
94
95 md.update(c.getSource().getBytes());
96 md.update(c.getStreamName().getBytes());
97 md.update(c.getTags().getBytes());
98 StringBuilder sb = new StringBuilder();
99 byte[] bytes = md.digest();
100 for(int i=0; i < bytes.length; ++i) {
101 if( (bytes[i] & 0xF0) == 0)
102 sb.append('0');
103 sb.append( Integer.toHexString(0xFF & bytes[i]) );
104 }
105 return sb.toString();
106 } catch(NoSuchAlgorithmException n) {
107 log.fatal(n);
108 System.exit(0);
109 return null;
110 }
111 }
112
113
114 private void pruneOldEntries() {
115 while(totalStoredSize > BUF_SIZE) {
116 String queueToPrune = receivedSIDs.remove();
117 Deque<Chunk> stream = chunksBySID.get(queueToPrune);
118 assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
119 Chunk c = stream.poll();
120 if(c != null)
121 totalStoredSize -= c.getData().length;
122 if(stream.isEmpty()) {
123 chunksBySID.remove(queueToPrune);
124 }
125 }
126 }
127
128 public synchronized void add(List<Chunk> chunks) {
129 for(Chunk c : chunks) {
130 String sid = getSID(c);
131 Deque<Chunk> stream = chunksBySID.get(sid);
132 if(stream == null) {
133 stream = new LinkedList<Chunk>();
134 chunksBySID.put(sid, stream);
135 }
136 stream.add(c);
137 receivedSIDs.add(sid);
138 totalStoredSize += c.getData().length;
139 }
140 pruneOldEntries();
141 }
142
143
144 @Override
145 protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
146 throws ServletException, IOException {
147
148 PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
149 resp.setStatus(200);
150 String path = req.getServletPath();
151 String streamID = req.getParameter("sid");
152 if (streamID != null) {
153 try {
154 Deque<Chunk> chunks = chunksBySID.get(streamID);
155 if(chunks != null) {
156 String streamName = getFriendlyName(chunks.peek());
157 out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
158 out.println("<pre>");
159 for(Chunk c: chunks) {
160 out.write(c.getData());
161 }
162 out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
163 } else
164 out.println("No data");
165 } catch(Exception e) {
166 out.println("<html><body>No data</body></html>");
167 }
168 out.println("</body></html>");
169 } else {
170 out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
171 for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet())
172 out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
173 out.println("</ul></body></html>");
174 }
175 out.flush();
176 }
177
178 private String getFriendlyName(Chunk chunk) {
179 if(chunk != null)
180 return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
181 else return "null";
182 }
183
184
185 }