1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.*;
23 import java.util.regex.*;
24 import org.apache.hadoop.chukwa.*;
25 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
26 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
27 import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor;
28 import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.TestRawAdaptor;
29 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
30 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
31 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
32 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
33 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
34 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
35 import org.apache.hadoop.chukwa.datacollection.sender.*;
36 import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
37 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
38 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
39 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
40 import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
41 import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
42 import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.io.SequenceFile;
48 import org.mortbay.jetty.Server;
49 import org.mortbay.jetty.servlet.Context;
50 import org.mortbay.jetty.servlet.ServletHolder;
51 import junit.framework.TestCase;
52 import static org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender.DelayedCommit;
53 import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
54
55 public class TestDelayedAcks extends TestCase {
56
57 static final int PORTNO = 9993;
58 static int END2END_TEST_SECS = 30;
59 static int SEND_RATE = 180* 1000;
60 static int CLIENT_SCANPERIOD = 1000;
61 static int SERVER_SCANPERIOD = 1000;
62 static int ROTATEPERIOD = 2000;
63
64 int ACK_TIMEOUT = 200;
65
66
67
68
69 public void testAdaptorTimeout() throws Exception {
70 Configuration conf = new Configuration();
71 conf.set("chukwaAgent.control.port", "0");
72 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
73 conf.setInt("chukwaAgent.adaptor.context.switch.time", 500);
74 conf.setInt(AdaptorResetThread.TIMEOUT_OPT, ACK_TIMEOUT);
75
76 ChukwaAgent agent = new ChukwaAgent(conf);
77 ChunkCatcherConnector chunks = new ChunkCatcherConnector();
78 chunks.start();
79 assertEquals(0, agent.adaptorCount());
80 File testFile = makeTestFile("testDA", 50, new File(System.getProperty("test.build.data", "/tmp")));
81 long len = testFile.length();
82 System.out.println("wrote data to " + testFile);
83 AdaptorResetThread restart = new AdaptorResetThread(conf, agent);
84
85 agent.processAddCommand("add fta = "+ FileTailingAdaptor.class.getCanonicalName()
86 + " testdata " + testFile.getCanonicalPath() + " 0" );
87
88
89 assertEquals(1, agent.adaptorCount());
90 Chunk c1 = chunks.waitForAChunk();
91 assertNotNull(c1);
92 List<CommitListEntry> pendingAcks = new ArrayList<CommitListEntry>();
93 pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(),
94 c1.getData().length, "foo", c1.getSeqID(), agent.getAdaptorName(c1.getInitiator())));
95 restart.reportPending(pendingAcks);
96
97 assertEquals(len, c1.getData().length);
98 Thread.sleep(ACK_TIMEOUT*2);
99 int resetCount = restart.resetTimedOutAdaptors(ACK_TIMEOUT);
100 Chunk c2 = chunks.waitForAChunk(1000);
101 assertNotNull(c2);
102 assertEquals(len, c2.getData().length);
103 assertTrue(resetCount > 0);
104 agent.shutdown();
105
106 testFile.delete();
107 }
108
109
110
111
112 public void testDelayedAck() throws Exception {
113 Configuration conf = new Configuration();
114
115 SeqFileWriter writer = new SeqFileWriter();
116
117 conf.set("writer.hdfs.filesystem", "file:///");
118
119 File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
120 if (!tempDir.exists()) {
121 tempDir.mkdirs();
122 }
123
124 String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis();
125
126 String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
127 conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
128
129 writer.init(conf);
130 ArrayList<Chunk> oneChunk = new ArrayList<Chunk>();
131 oneChunk.add(new ChunkImpl("dt", "name", 1, new byte[] {'b'}, null));
132
133 ChukwaWriter.CommitStatus cs = writer.add(oneChunk);
134 writer.close();
135
136 File seqWriterFile = null;
137 File directory = new File(seqWriterOutputDir);
138 String[] files = directory.list();
139 for(String file: files) {
140 if ( file.endsWith(".done") ){
141 seqWriterFile = new File(directory, file);
142 break;
143 }
144 }
145 long lenWritten = seqWriterFile.length();
146 System.out.println("wrote " + lenWritten+ " bytes");
147 assertTrue(cs instanceof ChukwaWriter.COMMIT_PENDING);
148 ChukwaWriter.COMMIT_PENDING pending = (ChukwaWriter.COMMIT_PENDING) cs;
149 assertTrue(pending.pendingEntries.size() == 1);
150 String res = pending.pendingEntries.get(0);
151 System.out.println("result was " + res);
152
153 Pattern expectedPat= Pattern.compile(".* ([0-9]+)\n");
154 Matcher match = expectedPat.matcher(res);
155 assertTrue(match.matches());
156 long bytesPart = Long.parseLong(match.group(1));
157 assertEquals(bytesPart, lenWritten);
158 }
159
160
161 public static Server startCollectorOnPort(Configuration conf, int port,
162 ServletCollector collector) throws Exception {
163 Server server = new Server(port);
164
165 Context root = new Context(server, "/", Context.SESSIONS);
166 root.addServlet(new ServletHolder(collector), "/*");
167 root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
168
169 server.start();
170 server.setStopAtShutdown(false);
171 return server;
172 }
173
174
175 public static String buildConf(Configuration conf) {
176 File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
177 if (!tempDir.exists()) {
178 tempDir.mkdirs();
179 }
180
181 String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis() ;
182
183 conf.setInt("chukwaCollector.rotateInterval", ROTATEPERIOD);
184 conf.set("writer.hdfs.filesystem", "file:///");
185 String seqWriterOutputDir = outputDirectory +"/chukwa_sink";
186 conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
187 conf.setInt(AsyncAckSender.POLLPERIOD_OPT, CLIENT_SCANPERIOD);
188 conf.setInt(CommitCheckServlet.SCANPERIOD_OPT, SERVER_SCANPERIOD);
189 conf.setBoolean(HttpConnector.ASYNC_ACKS_OPT, true);
190 conf.setInt(HttpConnector.MIN_POST_INTERVAL_OPT, 100);
191 conf.setInt(HttpConnector.MAX_SIZE_PER_POST_OPT, 10 * 1000*1000);
192 conf.setInt(SeqFileWriter.STAT_PERIOD_OPT, 60*60*24);
193 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
194
195 conf.set(AsyncAckSender.POLLHOSTS_OPT, "afilethatdoesntexist");
196
197 conf.setInt("chukwaAgent.control.port", 0);
198 return outputDirectory;
199 }
200
201 public void testEndToEnd() {
202 try {
203 Configuration conf = new Configuration();
204
205 String outputDirectory = buildConf(conf);
206 ServletCollector collector = new ServletCollector(conf);
207 Server collectorServ = startCollectorOnPort(conf, PORTNO, collector);
208 Thread.sleep(1000);
209
210 ChukwaAgent agent = new ChukwaAgent(conf);
211 HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
212 conn.start();
213 String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() +
214 " testData "+ SEND_RATE + " 0");
215 assertTrue("adaptor_constSend".equals(resp));
216 Thread.sleep(END2END_TEST_SECS * 1000);
217
218
219 assertNotNull(agent.getAdaptor("adaptor_constSend"));
220 long bytesOutput = agent.getAdaptor("adaptor_constSend").shutdown(AdaptorShutdownPolicy.GRACEFULLY);
221 Thread.sleep(CLIENT_SCANPERIOD + SERVER_SCANPERIOD + ROTATEPERIOD + 3000);
222
223 String[] stat = agent.getAdaptorList().get("adaptor_constSend").split(" ");
224 long bytesCommitted = Long.valueOf(stat[stat.length -1]);
225
226 long bytesPerSec = bytesOutput / (1000 * END2END_TEST_SECS);
227 System.out.println("data rate was " + bytesPerSec + " kb /second");
228
229
230 System.out.println(bytesCommitted + " bytes committed");
231 System.out.println(bytesOutput + " bytes output");
232 System.out.println("difference is " + (bytesOutput - bytesCommitted));
233 ChukwaWriter w = collector.getWriter();
234 long bytesWritten = ((SeqFileWriter)w).getBytesWritten();
235 System.out.println("collector wrote " + bytesWritten);
236
237 assertEquals(bytesCommitted, bytesOutput);
238 assertEquals(bytesWritten, bytesCommitted);
239
240
241 assertTrue(bytesPerSec > 9 * SEND_RATE/ 1000 / 10);
242 AsyncAckSender sender = (AsyncAckSender)conn.getSender();
243 assertEquals(0, sender.adaptorReset.getResetCount());
244
245 agent.shutdown();
246 collectorServ.stop();
247 conn.shutdown();
248 Thread.sleep(5000);
249 TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
250
251 } catch (Exception e) {
252 e.printStackTrace();
253 fail(e.toString());
254 }
255
256 }
257
258
259 }