1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.tools.backfilling;
19
20 import java.io.BufferedWriter;
21 import java.io.File;
22 import java.io.FileOutputStream;
23 import java.io.FileWriter;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26
27 import junit.framework.Assert;
28 import junit.framework.TestCase;
29
30 import org.apache.commons.io.FileUtils;
31 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
32 import org.apache.hadoop.chukwa.ChunkImpl;
33 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
34 import org.apache.hadoop.chukwa.validationframework.util.MD5;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.SequenceFile;
39
40
41 public class TestBackfillingLoader extends TestCase{
42
43 public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
44 String tmpDir = System.getProperty("test.build.data", "/tmp");
45 long ts = System.currentTimeMillis();
46 String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
47
48 Configuration conf = new Configuration();
49 conf.set("writer.hdfs.filesystem", "file:///");
50 conf.set("chukwaCollector.outputDir", dataDir + "/log/");
51 conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
52
53 String cluster = "MyCluster_" + ts;
54 String machine = "machine_" + ts;
55 String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
56 String recordType = "MyRecordType_" + ts;
57
58 try {
59 FileSystem fs = FileSystem.getLocal(conf);
60
61 File in1Dir = new File(dataDir + "/input");
62 in1Dir.mkdirs();
63 int lineCount = 107;
64 File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
65 long size = inputFile.length();
66
67 String logFile = inputFile.getAbsolutePath();
68 System.out.println("Output:" + logFile);
69 System.out.println("File:" + inputFile.length());
70 BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
71 loader.process();
72
73 File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
74
75 Assert.assertTrue(inputFile.exists() == false);
76 Assert.assertTrue(finalOutputFile.exists() == true);
77
78 String doneFile = null;
79 File directory = new File(dataDir + "/log/");
80 String[] files = directory.list();
81 for(String file: files) {
82 if ( file.endsWith(".done") ){
83 doneFile = dataDir + "/log/" + file;
84 break;
85 }
86 }
87
88 long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
89 cluster, recordType, machine, logFile);
90 Assert.assertTrue(seqId == size);
91
92 } catch (Throwable e) {
93 e.printStackTrace();
94 Assert.fail();
95 }
96 try {
97 FileUtils.deleteDirectory(new File(dataDir));
98 } catch (IOException e) {
99 e.printStackTrace();
100 }
101 }
102
103 public void testBackfillingLoaderWithFileAdaptor() {
104 String tmpDir = System.getProperty("test.build.data", "/tmp");
105 long ts = System.currentTimeMillis();
106 String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
107
108 Configuration conf = new Configuration();
109 conf.set("writer.hdfs.filesystem", "file:///");
110 conf.set("chukwaCollector.outputDir", dataDir + "/log/");
111 conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
112
113 String cluster = "MyCluster_" + ts;
114 String machine = "machine_" + ts;
115 String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
116 String recordType = "MyRecordType_" + ts;
117
118 try {
119 FileSystem fs = FileSystem.getLocal(conf);
120
121 File in1Dir = new File(dataDir + "/input");
122 in1Dir.mkdirs();
123 int lineCount = 118;
124 File inputFile = makeTestFile(dataDir + "/input/in2.txt",lineCount);
125 long size = inputFile.length();
126
127 String logFile = inputFile.getAbsolutePath();
128 System.out.println("Output:" + logFile);
129 System.out.println("File:" + inputFile.length());
130 BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
131 loader.process();
132
133 File finalOutputFile = new File(dataDir + "/input/in2.txt.sav");
134
135 Assert.assertTrue(inputFile.exists() == false);
136 Assert.assertTrue(finalOutputFile.exists() == true);
137
138 String doneFile = null;
139 File directory = new File(dataDir + "/log/");
140 String[] files = directory.list();
141 for(String file: files) {
142 if ( file.endsWith(".done") ){
143 doneFile = dataDir + "/log/" + file;
144 break;
145 }
146 }
147
148 long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
149 cluster, recordType, machine, logFile);
150 Assert.assertTrue(seqId == size);
151
152 } catch (Throwable e) {
153 e.printStackTrace();
154 Assert.fail();
155 }
156 try {
157 FileUtils.deleteDirectory(new File(dataDir));
158 } catch (IOException e) {
159 e.printStackTrace();
160 }
161 }
162
163
164
165 public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFile() {
166 String tmpDir = System.getProperty("test.build.data", "/tmp");
167 long ts = System.currentTimeMillis();
168 String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
169
170 Configuration conf = new Configuration();
171 conf.set("writer.hdfs.filesystem", "file:///");
172 conf.set("chukwaCollector.outputDir", dataDir + "/log/");
173 conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
174
175
176 String cluster = "MyCluster_" + ts;
177 String machine = "machine_" + ts;
178 String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
179 String recordType = "MyRecordType_" + ts;
180
181 try {
182 FileSystem fs = FileSystem.getLocal(conf);
183
184 File in1Dir = new File(dataDir + "/input");
185 in1Dir.mkdirs();
186 int lineCount = 1024*1024;
187 File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
188 long size = inputFile.length();
189
190 String logFile = inputFile.getAbsolutePath();
191 System.out.println("Output:" + logFile);
192 System.out.println("File:" + inputFile.length());
193 BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
194 loader.process();
195
196 File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
197
198 Assert.assertTrue(inputFile.exists() == false);
199 Assert.assertTrue(finalOutputFile.exists() == true);
200
201 String doneFile = null;
202 File directory = new File(dataDir + "/log/");
203 String[] files = directory.list();
204 for(String file: files) {
205 if ( file.endsWith(".done") ){
206 doneFile = dataDir + "/log/" + file;
207 break;
208 }
209 }
210
211 long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
212 cluster, recordType, machine, logFile);
213
214 Assert.assertTrue(seqId == size);
215 } catch (Throwable e) {
216 e.printStackTrace();
217 Assert.fail();
218 }
219 try {
220 FileUtils.deleteDirectory(new File(dataDir));
221 } catch (IOException e) {
222 e.printStackTrace();
223 }
224 }
225
226
227 public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFileLocalWriter() {
228 String tmpDir = System.getProperty("test.build.data", "/tmp");
229 long ts = System.currentTimeMillis();
230 String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
231
232 Configuration conf = new Configuration();
233 conf.set("writer.hdfs.filesystem", "file:///");
234 conf.set("chukwaCollector.outputDir", dataDir + "/log/");
235 conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
236 conf.set("chukwaCollector.localOutputDir", dataDir + "/log/");
237 conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
238 conf.set("chukwaCollector.minPercentFreeDisk", "2");
239
240 String cluster = "MyCluster_" + ts;
241 String machine = "machine_" + ts;
242 String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
243 String recordType = "MyRecordType_" + ts;
244
245 try {
246 FileSystem fs = FileSystem.getLocal(conf);
247
248 File in1Dir = new File(dataDir + "/input");
249 in1Dir.mkdirs();
250 int lineCount = 1024*1024*2;
251 File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
252 long size = inputFile.length();
253
254 String logFile = inputFile.getAbsolutePath();
255 System.out.println("Output:" + logFile);
256 System.out.println("File:" + inputFile.length());
257 BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
258 loader.process();
259
260 File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
261
262 Assert.assertTrue(inputFile.exists() == false);
263 Assert.assertTrue(finalOutputFile.exists() == true);
264
265 String doneFile = null;
266 File directory = new File(dataDir + "/log/");
267 String[] files = directory.list();
268 for(String file: files) {
269 if ( file.endsWith(".done") ){
270 doneFile = dataDir + "/log/" + file;
271 break;
272 }
273 }
274
275 long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
276 cluster, recordType, machine, logFile);
277
278 Assert.assertTrue(seqId == size);
279 } catch (Throwable e) {
280 e.printStackTrace();
281 Assert.fail();
282 }
283 try {
284 FileUtils.deleteDirectory(new File(dataDir));
285 } catch (IOException e) {
286 e.printStackTrace();
287 }
288 }
289 protected long validateDataSink(FileSystem fs,Configuration conf, String dataSinkFile, File logFile,
290 String cluster,String dataType, String source, String application) throws Throwable {
291 SequenceFile.Reader reader = null;
292 long lastSeqId = -1;
293 BufferedWriter out = null;
294 try {
295
296 reader = new SequenceFile.Reader(fs, new Path(dataSinkFile), conf);
297 ChukwaArchiveKey key = new ChukwaArchiveKey();
298 ChunkImpl chunk = ChunkImpl.getBlankChunk();
299
300 String dataSinkDumpName = dataSinkFile + ".dump";
301 out = new BufferedWriter(new FileWriter(dataSinkDumpName));
302
303
304
305 while (reader.next(key, chunk)) {
306 Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk)));
307 Assert.assertTrue(dataType.equals(chunk.getDataType()));
308 Assert.assertTrue(source.equals(chunk.getSource()));
309
310 out.write(new String(chunk.getData()));
311 lastSeqId = chunk.getSeqID() ;
312 }
313
314 out.close();
315 out = null;
316 reader.close();
317 reader = null;
318
319 String dataSinkMD5 = MD5.checksum(new File(dataSinkDumpName));
320 String logFileMD5 = MD5.checksum(logFile);
321 Assert.assertTrue(dataSinkMD5.equals(logFileMD5));
322 }
323 finally {
324 if (out != null) {
325 out.close();
326 }
327
328 if (reader != null) {
329 reader.close();
330 }
331 }
332
333
334 return lastSeqId;
335 }
336
337 private File makeTestFile(String name, int size) throws IOException {
338 File tmpOutput = new File(name);
339
340 FileOutputStream fos = new FileOutputStream(tmpOutput);
341
342 PrintWriter pw = new PrintWriter(fos);
343 for (int i = 0; i < size; ++i) {
344 pw.print(i + " ");
345 pw.println("abcdefghijklmnopqrstuvwxyz");
346 }
347 pw.flush();
348 pw.close();
349 return tmpOutput;
350 }
351
352 }