1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.File;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
27 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.log4j.Level;
30 import org.apache.log4j.Logger;
31 import junit.framework.TestCase;
32 import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
33
34 public class TestFileAdaptor extends TestCase {
35
36 Configuration conf = new Configuration();
37 static File baseDir;
38 File testFile;
39 ChunkCatcherConnector chunks;
40
41 public TestFileAdaptor() throws IOException {
42 baseDir = new File(System.getProperty("test.build.data", "/tmp"));
43 conf.setInt("chukwaAgent.control.port", 0);
44 conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
45 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
46 conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
47 conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
48 testFile = makeTestFile("test", 10, baseDir);
49
50 chunks = new ChunkCatcherConnector();
51 chunks.start();
52 }
53
54
55 public void testOnce() throws IOException,
56 ChukwaAgent.AlreadyRunningException, InterruptedException {
57
58 ChukwaAgent agent = new ChukwaAgent(conf);
59
60 assertEquals(0, agent.adaptorCount());
61
62 String name =agent.processAddCommand("add test = FileAdaptor raw " +testFile.getCanonicalPath() + " 0");
63 assertEquals(1, agent.adaptorCount());
64 assertEquals(name, "adaptor_test");
65 Chunk c = chunks.waitForAChunk(5000);
66 assertNotNull(c);
67 String dat = new String(c.getData());
68 assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
69 assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
70 assertTrue(c.getDataType().equals("raw"));
71 agent.shutdown();
72 }
73
74 public void testRepeatedly() throws IOException,
75 ChukwaAgent.AlreadyRunningException, InterruptedException {
76 int tests = 10;
77
78 ChukwaAgent agent = new ChukwaAgent(conf);
79 for(int i=0; i < tests; ++i) {
80 if(i % 100 == 0)
81 System.out.println("buzzed " + i + " times");
82
83 assertEquals(0, agent.adaptorCount());
84 String name = agent.processAddCommand("add test = FileAdaptor raw " +testFile.getCanonicalPath() + " 0");
85 assertEquals(1, agent.adaptorCount());
86 assertEquals(name, "adaptor_test");
87 Chunk c = chunks.waitForAChunk(5000);
88 assertNotNull(c);
89 String dat = new String(c.getData());
90 assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
91 assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
92 assertTrue(c.getDataType().equals("raw"));
93 while(agent.adaptorCount() > 0) {
94 agent.stopAdaptor("adaptor_test", false);
95 Thread.sleep(1000);
96 }
97 }
98 agent.shutdown();
99 }
100
101 }