View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19  
20  import javax.servlet.ServletConfig;
21  import javax.servlet.ServletException;
22  import javax.servlet.http.HttpServlet;
23  import javax.servlet.http.HttpServletRequest;
24  import javax.servlet.http.HttpServletResponse;
25  import org.apache.log4j.Logger;
26  import java.io.*;
27  import java.security.MessageDigest;
28  import java.security.NoSuchAlgorithmException;
29  import java.util.*;
30  import org.apache.hadoop.chukwa.*;
31  import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
32  import org.apache.hadoop.conf.Configuration;
33  
34  public class LogDisplayServlet extends HttpServlet {
35    
36    /*
37    static class StreamName {
38      byte[] md5;
39      public StreamName(Chunk c) {
40    
41      }
42      @Override
43      public int hashCode() {
44        int x=0;
45        for(int i=0; i< md5.length; ++i) {
46          x ^= (md5[i] << 4 * i);
47        }
48        return x;
49      }
50      
51      public boolean equals(Object x) {
52        if(x instanceof StreamName)
53          return Arrays.equals(md5, ((StreamName)x).md5);
54        else return false;
55      }
56    }*/
57    
58    public static final String DEFAULT_PATH = "logs";
59    public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
60    public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
61    long BUF_SIZE = 1024* 1024;
62    
63    Configuration conf;
64    Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
65    Queue<String> receivedSIDs = new LinkedList<String>();
66    long totalStoredSize = 0;
67  
68    private static final long serialVersionUID = -4602082382919009285L;
69    protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
70    
71    public LogDisplayServlet() {
72      conf = new Configuration();
73      ExtractorWriter.recipient = this;
74    }
75    
76    public LogDisplayServlet(Configuration c) {
77      conf = c;
78      ExtractorWriter.recipient = this;
79    }
80  
81    public void init(ServletConfig servletConf) throws ServletException {
82      BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
83    }
84  
85    @Override
86    protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
87      resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
88    }
89    
90    private String getSID(Chunk c) {
91      try { 
92        MessageDigest md;
93        md = MessageDigest.getInstance("MD5");
94    
95        md.update(c.getSource().getBytes());
96        md.update(c.getStreamName().getBytes());
97        md.update(c.getTags().getBytes());
98        StringBuilder sb = new StringBuilder();
99        byte[] bytes = md.digest();
100       for(int i=0; i < bytes.length; ++i) {
101         if( (bytes[i] & 0xF0) == 0)
102           sb.append('0');
103         sb.append( Integer.toHexString(0xFF & bytes[i]) );
104       }
105       return sb.toString();
106     } catch(NoSuchAlgorithmException n) {
107       log.fatal(n);
108       System.exit(0);
109       return null;
110     }
111   }
112   
113   
114   private void pruneOldEntries() {
115     while(totalStoredSize > BUF_SIZE) {
116       String queueToPrune = receivedSIDs.remove();
117       Deque<Chunk> stream = chunksBySID.get(queueToPrune);
118       assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
119       Chunk c = stream.poll();
120       if(c != null)
121         totalStoredSize -= c.getData().length;
122       if(stream.isEmpty()) {  //remove empty deques and their names.
123         chunksBySID.remove(queueToPrune);
124       }
125     }
126   }
127   
128   public synchronized void add(List<Chunk> chunks) {
129     for(Chunk c : chunks) {
130       String sid = getSID(c);
131       Deque<Chunk> stream = chunksBySID.get(sid);
132       if(stream == null) {
133         stream = new LinkedList<Chunk>();
134         chunksBySID.put(sid, stream);
135       }
136       stream.add(c);
137       receivedSIDs.add(sid);
138       totalStoredSize += c.getData().length;
139     }
140     pruneOldEntries();
141   }
142   
143 
144   @Override
145   protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
146       throws ServletException, IOException  {
147   
148     PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
149     resp.setStatus(200);
150     String path = req.getServletPath();
151     String streamID = req.getParameter("sid");
152     if (streamID != null) {
153       try {
154         Deque<Chunk> chunks = chunksBySID.get(streamID);
155         if(chunks != null) {
156           String streamName = getFriendlyName(chunks.peek());
157           out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
158           out.println("<pre>");
159           for(Chunk c: chunks) {
160             out.write(c.getData());
161           }
162           out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
163         } else
164           out.println("No data");
165       } catch(Exception e) {
166         out.println("<html><body>No data</body></html>");
167       }
168       out.println("</body></html>");
169     } else {
170       out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
171       for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet()) 
172         out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
173       out.println("</ul></body></html>");
174     }
175     out.flush();
176   }
177 
178   private String getFriendlyName(Chunk chunk) {
179     if(chunk != null)
180       return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
181     else return "null";
182   }
183 
184 
185 }