1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.net.URI;
23 import javax.servlet.ServletConfig;
24 import javax.servlet.ServletException;
25 import javax.servlet.http.HttpServlet;
26 import javax.servlet.http.HttpServletRequest;
27 import javax.servlet.http.HttpServletResponse;
28 import org.apache.log4j.Logger;
29 import java.util.*;
30 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.*;
35
36 public class CommitCheckServlet extends HttpServlet {
37
38 private static final long serialVersionUID = -4627538252371890849L;
39
40 protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
41 CommitCheckThread commitCheck;
42 Configuration conf;
43
44 public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
45
46
47 public static final String PURGEDELAY_OPT = "chukwaCollector.asyncAcks.purgedelay";
48
49
50 public static final String SCANPATHS_OPT = "chukwaCollector.asyncAcks.scanpaths";
51
52 public static final String DEFAULT_PATH = "acks";
53 public CommitCheckServlet(Configuration conf) {
54 this.conf = conf;
55 }
56
57 public void init(ServletConfig servletConf) throws ServletException {
58 log.info("initing commit check servlet");
59 try {
60 FileSystem fs = FileSystem.get(
61 new URI(conf.get("writer.hdfs.filesystem", "file:///")), conf);
62 log.info("commitcheck fs is " + fs.getUri());
63 commitCheck = new CommitCheckThread(conf, fs);
64 commitCheck.start();
65 } catch(Exception e) {
66 log.error("couldn't start CommitCheckServlet", e);
67 throw new ServletException(e);
68 }
69 }
70
71 @Override
72 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
73 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
74 }
75
76 @Override
77 protected void doGet(HttpServletRequest req, HttpServletResponse resp)
78 throws ServletException, IOException {
79
80 PrintStream out = new PrintStream(resp.getOutputStream());
81 resp.setStatus(200);
82
83 out.println("<html><body><h2>Commit status</h2><ul>");
84 for(String s: commitCheck.getLengthList())
85 out.println("<li>" + s + "</li>");
86 out.println("</ul></body></html>");
87 }
88
89
90 @Override
91 public void destroy() {
92 commitCheck.shutdown();
93 }
94
95
96
97
98
99 private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
100 int checkInterval = 1000 * 30;
101 volatile boolean running = true;
102 final Collection<Path> pathsToSearch;
103 final FileSystem fs;
104 final Map<String, Long> lengthTable;
105 final PriorityQueue<PurgeTask> oldEntries;
106 long delayUntilPurge = 1000 * 60 * 60 * 12;
107
108 static class PurgeTask implements Comparable<PurgeTask>{
109 long purgeTime;
110 String toPurge;
111 long len;
112
113 public PurgeTask(String s, long time, long len) {
114 this.toPurge = s;
115 this.purgeTime = time;
116 this.len = len;
117 }
118
119 public int compareTo(PurgeTask p) {
120 if(purgeTime < p.purgeTime)
121 return -1;
122 else if (purgeTime == p.purgeTime)
123 return 0;
124 else
125 return 1;
126 }
127 }
128
129
130 public CommitCheckThread(Configuration conf, FileSystem fs) {
131 this.fs = fs;
132 pathsToSearch = new ArrayList<Path>();
133 lengthTable = new LinkedHashMap<String, Long>();
134 oldEntries = new PriorityQueue<PurgeTask>();
135 checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
136
137 String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
138 pathsToSearch.add(new Path(sinkPath));
139
140 String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
141 String[] paths = additionalSearchPaths.split(",");
142 for(String s: paths)
143 if(s.length() > 1) {
144 Path path = new Path(s);
145 if(!pathsToSearch.contains(path))
146 pathsToSearch.add(path);
147 }
148
149 delayUntilPurge = conf.getLong(PURGEDELAY_OPT, delayUntilPurge);
150 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
151 String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
152 String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
153 pathsToSearch.add(new Path(archivesMRInputDir));
154
155 }
156
157 public void shutdown() {
158 running = false;
159 this.interrupt();
160 }
161
162 public void run() {
163 while(running) {
164 try {
165 Thread.sleep(checkInterval);
166 scanFS();
167 purgeOldEntries();
168 } catch(InterruptedException e) {}
169 catch(IOException e) {
170 log.error("io problem", e);
171 }
172 }
173 }
174
175 private synchronized void purgeOldEntries() {
176 long now = System.currentTimeMillis();
177 PurgeTask p = oldEntries.peek();
178 while(p != null && p.purgeTime < now) {
179 oldEntries.remove();
180 Long curLen = lengthTable.get(p.toPurge);
181 if(curLen != null && p.len >= curLen)
182 lengthTable.remove(p.toPurge);
183 }
184
185 }
186
187 private void scanFS() throws IOException {
188 long nextPurgeTime = System.currentTimeMillis() + delayUntilPurge;
189 for(Path dir: pathsToSearch) {
190 int filesSeen = 0;
191
192 FileStatus[] dataSinkFiles = fs.listStatus(dir, SinkArchiver.DATA_SINK_FILTER);
193 if(dataSinkFiles == null || dataSinkFiles.length == 0)
194 continue;
195
196 synchronized(this) {
197 for(FileStatus fstatus: dataSinkFiles) {
198 filesSeen++;
199 String name = fstatus.getPath().getName();
200 long len = fstatus.getLen();
201 oldEntries.add(new PurgeTask(name, nextPurgeTime, len));
202 lengthTable.put(name, len);
203 }
204 }
205 log.info("scanning fs: " + dir + "; saw "+ filesSeen+ " files");
206 }
207 }
208
209 public synchronized List<String> getLengthList() {
210 ArrayList<String> list = new ArrayList<String>(lengthTable.size());
211 for(Map.Entry<String, Long> e: lengthTable.entrySet()) {
212 list.add(e.getKey() + " " + e.getValue());
213 }
214 return list;
215 }
216
217 }
218
219 }