1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector;
19 import java.io.File;
20 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
21 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
22 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
23 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
24 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
25 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
26 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
27 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
28 import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
29 import org.apache.hadoop.conf.Configuration;
30 import org.mortbay.jetty.Server;
31 import junit.framework.TestCase;
32
33
34
35 public class TestAdaptorTimeout extends TestCase {
36 static final int PORTNO = 9997;
37 static final int TEST_DURATION_SECS = 30;
38 static int SEND_RATE = 10* 1000;
39
40 public void testAdaptorTimeout() throws Exception {
41 Configuration conf = new Configuration();
42
43 String outputDirectory = TestDelayedAcks.buildConf(conf);
44 conf.setInt(AdaptorResetThread.TIMEOUT_OPT, 1000);
45 ServletCollector collector = new ServletCollector(conf);
46 Server collectorServ = TestDelayedAcks.startCollectorOnPort(conf, PORTNO, collector);
47 Thread.sleep(1000);
48
49 ChukwaAgent agent = new ChukwaAgent(conf);
50 HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
51 conn.start();
52 String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() +
53 " testData "+ SEND_RATE + " 0");
54 assertTrue("adaptor_constSend".equals(resp));
55 Thread.sleep(TEST_DURATION_SECS * 1000);
56
57 AsyncAckSender sender = (AsyncAckSender)conn.getSender();
58 int resets = sender.adaptorReset.getResetCount();
59 System.out.println(resets + " resets");
60 assertTrue(resets > 0);
61
62 agent.shutdown();
63 collectorServ.stop();
64 conn.shutdown();
65 Thread.sleep(5000);
66
67 long dups = TestFailedCollectorAck.checkDirs(conf, conf.get(SeqFileWriter.OUTPUT_DIR_OPT));
68 assertTrue(dups > 0);
69 TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
70 }
71
72 }