1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.PrintWriter;
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
26 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
27 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
28 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
29 import junit.framework.Assert;
30 import junit.framework.TestCase;
31
32 public class TestFileTailingAdaptorBigRecord extends TestCase {
33
34 ChunkCatcherConnector chunks;
35
36 public void testBigRecord() {
37 File f = null;
38 try {
39 File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
40 if (!tempDir.exists()) {
41 tempDir.mkdirs();
42 }
43 String logFile = tempDir.getPath() + "/Chukwa-bigRecord.txt";
44 f = makeTestFile(logFile);
45
46 chunks = new ChunkCatcherConnector();
47 chunks.start();
48
49
50 ChukwaConfiguration cc = new ChukwaConfiguration();
51 cc.set("chukwaAgent.control.port", "0");
52 cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
53 ChukwaAgent agent = new ChukwaAgent(cc);
54 int portno = agent.getControllerPort();
55 while (portno == -1) {
56 Thread.sleep(1000);
57 portno = agent.getControllerPort();
58 }
59
60
61 ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
62 cli.removeAll();
63
64
65 Thread.sleep(5000);
66 String adaptorId = agent
67 .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"
68 + " BigRecord " + logFile + " 0");
69 assertNotNull(adaptorId);
70
71 boolean record8Found = false;
72 Chunk c = null;
73
74
75 while (!record8Found) {
76 c = chunks.waitForAChunk();
77 String data = new String(c.getData());
78 if (c.getDataType().equals("BigRecord")
79 && data.indexOf("8 abcdefghijklmnopqrstuvwxyz") >= 0) {
80 record8Found = true;
81 }
82 }
83 agent.stopAdaptor(adaptorId, true);
84 agent.shutdown();
85 Thread.sleep(2000);
86 } catch (Exception e) {
87 Assert.fail("Exception in testBigRecord: " + e.getMessage());
88 } finally {
89 if (f != null) {
90 f.delete();
91 }
92 }
93 }
94
95 private File makeTestFile(String name) throws IOException {
96 File tmpOutput = new File(name);
97 FileOutputStream fos = new FileOutputStream(tmpOutput);
98
99 PrintWriter pw = new PrintWriter(fos);
100 for (int i = 0; i < 5; ++i) {
101 pw.print(i + " ");
102 pw.println("abcdefghijklmnopqrstuvwxyz");
103 }
104 pw.print("6 ");
105 for (int i = 0; i < 10; ++i) {
106 pw.print("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz");
107 }
108 pw.print("\n");
109 pw.print("7 ");
110 pw.println("abcdefghijklmnopqrstuvwxyz");
111 pw.print("8 ");
112 pw.println("abcdefghijklmnopqrstuvwxyz");
113
114 pw.flush();
115 pw.close();
116 return tmpOutput;
117 }
118
119 }