1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.Map;
24 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
25 import org.apache.hadoop.conf.*;
26 import org.apache.log4j.Level;
27 import org.apache.log4j.Logger;
28 import junit.framework.TestCase;
29
30 public class TestDirTailingAdaptor extends TestCase {
31
32 ChukwaAgent agent;
33 File baseDir;
34 static final int SCAN_INTERVAL = 1000;
35
36
37
38
39
40
41
42
43 public void testDirTailerFiltering() throws IOException,
44 ChukwaAgent.AlreadyRunningException, InterruptedException {
45
46 DirTailingAdaptor.log.setLevel(Level.DEBUG);
47
48 Configuration conf = new Configuration();
49 baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
50 File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
51 createEmptyDir(checkpointDir);
52
53 conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
54 conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
55 conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
56 conf.setInt("chukwaAgent.control.port", 0);
57 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
58
59 agent = new ChukwaAgent(conf);
60 File emptyDir = new File(baseDir, "emptyDir2");
61 createEmptyDir(emptyDir);
62
63 assertEquals(0, agent.adaptorCount());
64
65 agent.processAddCommand("add emptydir2= DirTailingAdaptor raw " + emptyDir + " *file filetailer.CharFileTailingAdaptorUTF8 0");
66 assertEquals(1, agent.adaptorCount());
67
68 File dirWithFile = new File(baseDir, "dir3");
69 dirWithFile.delete();
70 assertFalse("temp directory not empty",dirWithFile.exists());
71
72
73 dirWithFile.mkdir();
74 File inDir = File.createTempFile("atemp", "file", dirWithFile);
75 inDir.deleteOnExit();
76
77 File noreadFile = File.createTempFile("atemp", "noread", dirWithFile);
78 noreadFile.deleteOnExit();
79
80
81 agent.processAddCommand("add dir3= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
82 Thread.sleep(3000);
83 assertEquals(3, agent.adaptorCount());
84 agent.shutdown();
85
86 conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
87 Thread.sleep(1500);
88 File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
89 File aNewFile = File.createTempFile("new", "file", dirWithFile);
90 anOldFile.deleteOnExit();
91 aNewFile.deleteOnExit();
92 anOldFile.setLastModified(10);
93 agent = new ChukwaAgent(conf);
94
95 Thread.sleep(3 * SCAN_INTERVAL);
96 assertTrue(aNewFile.exists());
97
98
99 for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
100 System.out.println(adaptors.getKey() +": " + adaptors.getValue());
101 assertFalse(adaptors.getValue().contains("oldXYZ"));
102 }
103 Thread.sleep(3 * SCAN_INTERVAL);
104
105
106 assertEquals(4, agent.adaptorCount());
107 agent.shutdown();
108 Thread.sleep(1500);
109
110 nukeDirContents(checkpointDir);
111 checkpointDir.delete();
112 emptyDir.delete();
113 nukeDirContents(dirWithFile);
114 dirWithFile.delete();
115 }
116
117
118 public void testDirTailer() throws IOException,
119 ChukwaAgent.AlreadyRunningException, InterruptedException {
120
121 DirTailingAdaptor.log.setLevel(Level.DEBUG);
122
123 Configuration conf = new Configuration();
124 baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
125 File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
126 createEmptyDir(checkpointDir);
127
128 conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
129 conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
130 conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
131 conf.setInt("chukwaAgent.control.port", 0);
132 conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
133
134 boolean retry = true;
135 while(retry) {
136 try {
137 retry = false;
138 agent = new ChukwaAgent(conf);
139 } catch(Exception e) {
140 retry = true;
141 }
142 }
143 File emptyDir = new File(baseDir, "emptyDir");
144 createEmptyDir(emptyDir);
145
146 assertEquals(0, agent.adaptorCount());
147 agent.processAddCommand("add emptydir= DirTailingAdaptor raw " + emptyDir + " filetailer.CharFileTailingAdaptorUTF8 0");
148 assertEquals(1, agent.adaptorCount());
149
150 File dirWithFile = new File(baseDir, "dir2");
151 dirWithFile.delete();
152 assertFalse("temp directory not empty",dirWithFile.exists());
153
154 dirWithFile.mkdir();
155 File inDir = File.createTempFile("atemp", "file", dirWithFile);
156 inDir.deleteOnExit();
157 agent.processAddCommand("add dir2= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
158 Thread.sleep(3000);
159 assertEquals(3, agent.adaptorCount());
160 System.out.println("DirTailingAdaptor looks OK before restart");
161 agent.shutdown();
162
163 conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
164 Thread.sleep(1500);
165 File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
166 File aNewFile = File.createTempFile("new", "file", dirWithFile);
167 anOldFile.deleteOnExit();
168 aNewFile.deleteOnExit();
169 anOldFile.setLastModified(10);
170 agent = new ChukwaAgent(conf);
171
172 Thread.sleep(3 * SCAN_INTERVAL);
173 assertTrue(aNewFile.exists());
174
175
176 for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
177 System.out.println(adaptors.getKey() +": " + adaptors.getValue());
178 assertFalse(adaptors.getValue().contains("oldXYZ"));
179 }
180
181
182 Thread.sleep(3 * SCAN_INTERVAL);
183 assertEquals(4, agent.adaptorCount());
184 agent.shutdown();
185
186 nukeDirContents(checkpointDir);
187 checkpointDir.delete();
188 emptyDir.delete();
189 nukeDirContents(dirWithFile);
190 dirWithFile.delete();
191 }
192
193
194 public static boolean nukeDirContents(File dir) {
195 if(dir.exists()) {
196 if(dir.isDirectory()) {
197 for(File f: dir.listFiles()) {
198 nukeDirContents(f);
199 f.delete();
200 }
201 } else
202 dir.delete();
203
204 return true;
205 }
206 return false;
207 }
208
209 public static void createEmptyDir(File dir) {
210 if(!nukeDirContents(dir))
211 dir.mkdir();
212 assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
213 }
214
215 }