View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19  
20  import java.io.IOException;
21  import java.io.PrintStream;
22  import java.net.URI;
23  import javax.servlet.ServletConfig;
24  import javax.servlet.ServletException;
25  import javax.servlet.http.HttpServlet;
26  import javax.servlet.http.HttpServletRequest;
27  import javax.servlet.http.HttpServletResponse;
28  import org.apache.log4j.Logger;
29  import java.util.*;
30  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32  import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.*;
35  
36  public class CommitCheckServlet extends HttpServlet {
37  
38    private static final long serialVersionUID = -4627538252371890849L;
39    
40    protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
41    CommitCheckThread commitCheck;
42    Configuration conf;
43      //interval at which to scan the filesystem, ms
44    public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
45    
46      //interval at which to discard seen files, ms
47    public static final String PURGEDELAY_OPT = "chukwaCollector.asyncAcks.purgedelay"; 
48      
49    //list of dirs to search, separated by commas
50    public static final String SCANPATHS_OPT = "chukwaCollector.asyncAcks.scanpaths";
51      
52    public static final String DEFAULT_PATH = "acks"; //path to this servlet on collector
53    public CommitCheckServlet(Configuration conf) {
54      this.conf = conf;
55    }
56    
57    public void init(ServletConfig servletConf) throws ServletException {
58      log.info("initing commit check servlet");
59      try {
60        FileSystem fs = FileSystem.get(
61            new URI(conf.get("writer.hdfs.filesystem", "file:///")), conf);
62        log.info("commitcheck fs is " + fs.getUri());
63        commitCheck = new CommitCheckThread(conf, fs);
64        commitCheck.start();
65      } catch(Exception e) {
66        log.error("couldn't start CommitCheckServlet", e);
67        throw new ServletException(e);
68      }
69    }
70  
71    @Override
72    protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
73      resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
74    }
75    
76    @Override
77    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
78        throws ServletException, IOException  {
79    
80      PrintStream out = new PrintStream(resp.getOutputStream());
81      resp.setStatus(200);
82  
83      out.println("<html><body><h2>Commit status</h2><ul>");
84      for(String s: commitCheck.getLengthList()) 
85        out.println("<li>" + s + "</li>");
86      out.println("</ul></body></html>");
87    }
88    
89  
90    @Override
91    public void destroy() {
92      commitCheck.shutdown();
93    }
94    
95    /**
96     * Ideally, we'd use zookeeper to monitor archiver/demux rotation.
97     * For now, instead, we'll just do an ls in a bunch of places.
98     */
99    private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
100     int checkInterval = 1000 * 30;
101     volatile boolean running = true;
102     final Collection<Path> pathsToSearch;
103     final FileSystem fs;
104     final Map<String, Long> lengthTable;
105     final PriorityQueue<PurgeTask> oldEntries;
106     long delayUntilPurge = 1000 * 60 * 60 * 12;
107     
108     static class PurgeTask implements Comparable<PurgeTask>{
109       long purgeTime;
110       String toPurge;
111       long len;
112       
113       public PurgeTask(String s, long time, long len) {
114         this.toPurge = s;
115         this.purgeTime = time;
116         this.len = len;
117       }
118       
119       public int compareTo(PurgeTask p) {
120         if(purgeTime < p.purgeTime)
121           return -1;
122         else if (purgeTime == p.purgeTime)
123           return 0;
124         else
125           return 1;
126       }
127     }
128     
129     
130     public CommitCheckThread(Configuration conf, FileSystem fs) {
131       this.fs = fs;
132       pathsToSearch = new ArrayList<Path>();
133       lengthTable = new LinkedHashMap<String, Long>();
134       oldEntries = new PriorityQueue<PurgeTask>();
135       checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
136       
137       String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
138       pathsToSearch.add(new Path(sinkPath));
139       
140       String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
141       String[] paths = additionalSearchPaths.split(",");
142       for(String s: paths)
143         if(s.length() > 1) {
144           Path path = new Path(s);
145           if(!pathsToSearch.contains(path))
146             pathsToSearch.add(path);
147         }
148       
149       delayUntilPurge = conf.getLong(PURGEDELAY_OPT, delayUntilPurge);
150       String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
151       String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
152       String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
153       pathsToSearch.add(new Path(archivesMRInputDir));
154       //TODO: set checkInterval using conf
155     }
156     
157     public void shutdown() {
158       running = false;
159       this.interrupt();
160     }
161     
162     public void run() {
163       while(running) {
164         try {
165           Thread.sleep(checkInterval);
166           scanFS();
167           purgeOldEntries();
168         } catch(InterruptedException e) {}
169           catch(IOException e) {
170            log.error("io problem", e);
171         }
172       }
173    }
174 
175     private synchronized void purgeOldEntries() {
176       long now = System.currentTimeMillis();
177       PurgeTask p = oldEntries.peek();
178       while(p != null && p.purgeTime < now) {
179         oldEntries.remove();
180         Long curLen = lengthTable.get(p.toPurge);
181         if(curLen != null && p.len >= curLen)
182           lengthTable.remove(p.toPurge);
183       }
184       
185     }
186 
187     private void scanFS() throws IOException {
188       long nextPurgeTime = System.currentTimeMillis() + delayUntilPurge;
189       for(Path dir: pathsToSearch) {
190         int filesSeen = 0;
191         
192         FileStatus[] dataSinkFiles = fs.listStatus(dir, SinkArchiver.DATA_SINK_FILTER);
193         if(dataSinkFiles == null || dataSinkFiles.length == 0)
194           continue;
195         
196         synchronized(this) {
197           for(FileStatus fstatus: dataSinkFiles) {
198             filesSeen++;
199             String name = fstatus.getPath().getName();
200             long len = fstatus.getLen();
201             oldEntries.add(new PurgeTask(name, nextPurgeTime, len));
202             lengthTable.put(name, len);
203           }
204         }
205         log.info("scanning fs: " + dir + "; saw "+ filesSeen+ " files");
206       }
207     }
208 
209     public synchronized List<String> getLengthList() {
210       ArrayList<String> list = new ArrayList<String>(lengthTable.size());
211       for(Map.Entry<String, Long> e: lengthTable.entrySet()) {
212         list.add(e.getKey() + " " + e.getValue());
213       }
214       return list;
215     }
216     
217   }
218 
219 }