1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.PrintWriter;
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
26 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
27 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
29 import org.apache.hadoop.conf.Configuration;
30 import junit.framework.TestCase;
31 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
32 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
33 import org.apache.log4j.Level;
34
35 public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
36
37 ChunkCatcherConnector chunks;
38
39 public TestRCheckAdaptor() {
40 chunks = new ChunkCatcherConnector();
41 chunks.start();
42 }
43
44 public void testBaseCases() throws IOException, InterruptedException,
45 ChukwaAgent.AlreadyRunningException {
46 Configuration conf = new Configuration();
47 conf.set("chukwaAgent.control.port", "0");
48 conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
49
50
51 File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
52 TestDirTailingAdaptor.createEmptyDir(baseDir);
53 File tmpOutput = new File(baseDir, "rotateTest.1");
54 PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput));
55 pw.println("First");
56 pw.close();
57 Thread.sleep(1000);
58 tmpOutput = new File(baseDir, "rotateTest");
59 pw = new PrintWriter(new FileOutputStream(tmpOutput));
60 pw.println("Second");
61 pw.close();
62
63
64 ChukwaAgent agent = new ChukwaAgent(conf);
65 String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
66 assertNotNull(adaptorID);
67
68 Chunk c = chunks.waitForAChunk(2000);
69 assertNotNull(c);
70 assertTrue(c.getData().length == 6);
71 assertTrue("First\n".equals(new String(c.getData())));
72 c = chunks.waitForAChunk(2000);
73 assertNotNull(c);
74 assertTrue(c.getData().length == 7);
75 assertTrue("Second\n".equals(new String(c.getData())));
76
77 pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
78 pw.println("Third");
79 pw.close();
80 c = chunks.waitForAChunk(2000);
81
82 assertNotNull(c);
83 assertTrue(c.getData().length == 6);
84 assertTrue("Third\n".equals(new String(c.getData())));
85 Thread.sleep(1500);
86
87 tmpOutput.renameTo(new File(baseDir, "rotateTest.2"));
88 pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
89 pw.println("Fourth");
90 pw.close();
91 c = chunks.waitForAChunk(2000);
92
93 assertNotNull(c);
94 System.out.println("got " + new String(c.getData()));
95 assertTrue("Fourth\n".equals(new String(c.getData())));
96
97 Thread.sleep(1500);
98
99 tmpOutput.renameTo(new File(baseDir, "rotateTest.3"));
100 Thread.sleep(400);
101 pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
102 pw.println("Fifth");
103 pw.close();
104 c = chunks.waitForAChunk(2000);
105 assertNotNull(c);
106 System.out.println("got " + new String(c.getData()));
107 assertTrue("Fifth\n".equals(new String(c.getData())));
108
109 agent.shutdown();
110 Thread.sleep(2000);
111 }
112
113
114 public void testContinuously() throws Exception {
115 File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
116 TestDirTailingAdaptor.createEmptyDir(baseDir);
117 File tmpOutput = new File(baseDir, "continuousTest");
118 PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
119 LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
120
121
122 RCheckFTAdaptor rca = new RCheckFTAdaptor();
123 rca.parseArgs("Test", tmpOutput.getAbsolutePath(), AdaptorManager.NULL);
124 rca.start("id", "Test", 0, this);
125
126
127 Thread.sleep(1000);
128 for(int i= 0; i < 200; ++i) {
129 Thread.sleep(120);
130 pw.println("This is line:" + i);
131 if( i % 5 == 0)
132 pw.flush();
133 if(i % 20 == 0) {
134 System.err.println("rotating");
135 pw.close();
136 tmpOutput.renameTo( new File(baseDir, "continuousTest."+(i/10)));
137 pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
138 }
139 }
140 Thread.sleep(1000);
141
142 rca.shutdown(AdaptorShutdownPolicy.HARD_STOP);
143
144 }
145
146 volatile int nextExpectedLine = 0;
147
148 @Override
149 public void add(Chunk event) throws InterruptedException {
150
151 String[] lines = new String(event.getData()).split("\n");
152 System.err.println("got chunk; " + lines.length + " lines " + event.getData().length + " bytes");
153 for(String line: lines) {
154 String n = line.substring(line.indexOf(':')+1);
155 int i = Integer.parseInt(n);
156
157 if(i != nextExpectedLine) {
158 System.err.println("lines out of order: saw " + i + " expected " + nextExpectedLine);
159 System.exit(0);
160 fail();
161 }
162 nextExpectedLine = i+1;
163
164 }
165 }
166
167 }