1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20
21 import java.io.*;
22
23 import junit.framework.TestCase;
24 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25 import java.util.Map;
26 import java.util.Iterator;
27 import org.apache.hadoop.chukwa.Chunk;
28 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
29 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
30 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
31 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
32 import org.apache.hadoop.conf.Configuration;
33 import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
34
35 public class TestFileTailingAdaptors extends TestCase {
36 ChunkCatcherConnector chunks;
37 Configuration conf = new Configuration();
38 File baseDir, testFile;
39
40 public TestFileTailingAdaptors() throws IOException {
41 chunks = new ChunkCatcherConnector();
42 chunks.start();
43 baseDir = new File(System.getProperty("test.build.data", "/tmp"));
44 conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
45 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
46 conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
47 conf.set("chukwaAgent.control.port", "0");
48
49 testFile = makeTestFile("chukwaCrSepTest", 80,baseDir);
50
51 }
52
53 public void testCrSepAdaptor() throws IOException, InterruptedException,
54 ChukwaAgent.AlreadyRunningException {
55 ChukwaAgent agent = new ChukwaAgent(conf);
56
57
58
59 Thread.sleep(5000);
60 assertEquals(0, agent.adaptorCount());
61 String adaptorId = agent
62 .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
63 + " lines " + testFile + " 0");
64 assertNotNull(adaptorId);
65 assertEquals(1, agent.adaptorCount());
66
67 System.out.println("getting a chunk...");
68 Chunk c = chunks.waitForAChunk();
69 System.out.println("got chunk");
70 while (!c.getDataType().equals("lines")) {
71 c = chunks.waitForAChunk();
72 }
73 assertTrue(c.getSeqID() == testFile.length());
74 assertTrue(c.getRecordOffsets().length == 80);
75 int recStart = 0;
76 for (int rec = 0; rec < c.getRecordOffsets().length; ++rec) {
77 String record = new String(c.getData(), recStart,
78 c.getRecordOffsets()[rec] - recStart + 1);
79 assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
80 recStart = c.getRecordOffsets()[rec] + 1;
81 }
82 assertTrue(c.getDataType().equals("lines"));
83 agent.stopAdaptor(adaptorId, false);
84 agent.shutdown();
85 Thread.sleep(2000);
86 }
87
88 public void testRepeatedlyOnBigFile() throws IOException,
89 ChukwaAgent.AlreadyRunningException, InterruptedException {
90 int tests = 10;
91
92 ChukwaAgent agent = new ChukwaAgent(conf);
93 for(int i=0; i < tests; ++i) {
94 if(i % 100 == 0)
95 System.out.println("buzzed " + i + " times");
96
97 assertEquals(0, agent.adaptorCount());
98 agent.processAddCommand("add adaptor_test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
99 assertEquals(1, agent.adaptorCount());
100 Chunk c = chunks.waitForAChunk();
101 String dat = new String(c.getData());
102 assertTrue(dat.startsWith("0 abcdefghijklmnopqrstuvwxyz"));
103 assertTrue(dat.endsWith("9 abcdefghijklmnopqrstuvwxyz\n"));
104 assertTrue(c.getDataType().equals("raw"));
105 if(agent.adaptorCount() > 0)
106 agent.stopAdaptor("adaptor_test", false);
107 }
108 agent.shutdown();
109 }
110
111
112 public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
113 InterruptedException{
114 File testFile = makeTestFile("foo", 120,baseDir);
115 ChukwaAgent agent = new ChukwaAgent(conf);
116 assertEquals(0, agent.adaptorCount());
117 agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
118 assertEquals(1, agent.adaptorCount());
119 Thread.sleep(2000);
120 agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
121 assertEquals(1, agent.adaptorCount());
122 chunks.clear();
123 agent.shutdown();
124 }
125
126 }