1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20
21 import java.io.*;
22 import junit.framework.TestCase;
23 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24 import java.util.Map;
25 import java.util.Iterator;
26 import org.apache.hadoop.chukwa.Chunk;
27 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
28 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
30 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
31 import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
32
33 public class TestLogRotate extends TestCase {
34 ChunkCatcherConnector chunks;
35
36 public TestLogRotate() {
37 chunks = new ChunkCatcherConnector();
38 chunks.start();
39 }
40
41 public void testLogRotate() throws IOException, InterruptedException,
42 ChukwaAgent.AlreadyRunningException {
43 ChukwaAgent agent = new ChukwaAgent();
44
45 ChukwaConfiguration cc = new ChukwaConfiguration();
46 int portno = cc.getInt("chukwaAgent.control.port", 9093);
47 ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
48 cli.removeAll();
49
50 Thread.sleep(5000);
51 File testFile = makeTestFile("chukwaLogRotateTest", 80);
52 String adaptorId = agent
53 .processAddCommand("add lr =org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
54 + " lines " + testFile + " 0");
55 assertTrue(adaptorId.equals("adaptor_lr"));
56 System.out.println("getting a chunk...");
57 Chunk c = chunks.waitForAChunk();
58 System.out.println("got chunk");
59 while (!c.getDataType().equals("lines")) {
60 c = chunks.waitForAChunk();
61 }
62 assertTrue(c.getSeqID() == testFile.length());
63 assertTrue(c.getRecordOffsets().length == 80);
64 int recStart = 0;
65 for (int rec = 0; rec < c.getRecordOffsets().length; ++rec) {
66 String record = new String(c.getData(), recStart,
67 c.getRecordOffsets()[rec] - recStart + 1);
68 System.out.println("record " + rec + " was: " + record);
69 assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
70 recStart = c.getRecordOffsets()[rec] + 1;
71 }
72 assertTrue(c.getDataType().equals("lines"));
73 testFile = makeTestFile("chukwaLogRotateTest", 40);
74 c = chunks.waitForAChunk();
75 System.out.println("got chunk");
76 while (!c.getDataType().equals("lines")) {
77 c = chunks.waitForAChunk();
78 }
79
80 assertTrue(c.getRecordOffsets().length == 40);
81 recStart = 0;
82 for (int rec = 0; rec < c.getRecordOffsets().length; ++rec) {
83 String record = new String(c.getData(), recStart,
84 c.getRecordOffsets()[rec] - recStart + 1);
85 System.out.println("record " + rec + " was: " + record);
86 assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
87 recStart = c.getRecordOffsets()[rec] + 1;
88 }
89 assertTrue(c.getDataType().equals("lines"));
90 agent.stopAdaptor(adaptorId, false);
91 agent.shutdown();
92 Thread.sleep(2000);
93 }
94
95
96 }