View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.tools.backfilling;
19  
20  import java.io.BufferedWriter;
21  import java.io.File;
22  import java.io.FileOutputStream;
23  import java.io.FileWriter;
24  import java.io.IOException;
25  import java.io.PrintWriter;
26  
27  import junit.framework.Assert;
28  import junit.framework.TestCase;
29  
30  import org.apache.commons.io.FileUtils;
31  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
32  import org.apache.hadoop.chukwa.ChunkImpl;
33  import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
34  import org.apache.hadoop.chukwa.validationframework.util.MD5;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.io.SequenceFile;
39  
40  
41  public class TestBackfillingLoader extends TestCase{
42  
43    public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
44      String tmpDir = System.getProperty("test.build.data", "/tmp");
45      long ts = System.currentTimeMillis();
46      String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
47  
48      Configuration conf = new Configuration();
49      conf.set("writer.hdfs.filesystem", "file:///");
50      conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
51      conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
52      
53      String cluster = "MyCluster_" + ts;
54      String machine = "machine_" + ts;
55      String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
56      String recordType = "MyRecordType_" + ts;
57      
58      try {
59        FileSystem fs = FileSystem.getLocal(conf);
60        
61        File in1Dir = new File(dataDir + "/input");
62        in1Dir.mkdirs();
63        int lineCount = 107;
64        File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
65        long size = inputFile.length();
66        
67        String logFile = inputFile.getAbsolutePath();
68        System.out.println("Output:" + logFile);
69        System.out.println("File:" + inputFile.length());
70        BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
71        loader.process();
72        
73        File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
74        
75        Assert.assertTrue(inputFile.exists() == false);
76        Assert.assertTrue(finalOutputFile.exists() == true);
77        
78        String doneFile = null;
79        File directory = new File(dataDir  + "/log/");
80        String[] files = directory.list();
81        for(String file: files) {
82          if ( file.endsWith(".done") ){
83            doneFile = dataDir  + "/log/" + file;
84            break;
85          }
86        }
87        
88        long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
89            cluster, recordType,  machine, logFile);
90        Assert.assertTrue(seqId == size);
91        
92      } catch (Throwable e) {
93        e.printStackTrace();
94        Assert.fail();
95      }
96      try {
97        FileUtils.deleteDirectory(new File(dataDir));
98      } catch (IOException e) {
99        e.printStackTrace();
100     }
101   }
102 
103   public void testBackfillingLoaderWithFileAdaptor() {
104     String tmpDir = System.getProperty("test.build.data", "/tmp");
105     long ts = System.currentTimeMillis();
106     String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
107 
108     Configuration conf = new Configuration();
109     conf.set("writer.hdfs.filesystem", "file:///");
110     conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
111     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
112     
113     String cluster = "MyCluster_" + ts;
114     String machine = "machine_" + ts;
115     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
116     String recordType = "MyRecordType_" + ts;
117     
118     try {
119       FileSystem fs = FileSystem.getLocal(conf);
120       
121       File in1Dir = new File(dataDir + "/input");
122       in1Dir.mkdirs();
123       int lineCount = 118;
124       File inputFile = makeTestFile(dataDir + "/input/in2.txt",lineCount);
125       long size = inputFile.length();
126       
127       String logFile = inputFile.getAbsolutePath();
128       System.out.println("Output:" + logFile);
129       System.out.println("File:" + inputFile.length());
130       BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
131       loader.process();
132       
133       File finalOutputFile = new File(dataDir + "/input/in2.txt.sav");
134       
135       Assert.assertTrue(inputFile.exists() == false);
136       Assert.assertTrue(finalOutputFile.exists() == true);
137       
138       String doneFile = null;
139       File directory = new File(dataDir  + "/log/");
140       String[] files = directory.list();
141       for(String file: files) {
142         if ( file.endsWith(".done") ){
143           doneFile = dataDir  + "/log/" + file;
144           break;
145         }
146       }
147       
148      long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
149           cluster, recordType,  machine, logFile);
150       Assert.assertTrue(seqId == size);
151       
152     } catch (Throwable e) {
153       e.printStackTrace();
154       Assert.fail();
155     }
156     try {
157       FileUtils.deleteDirectory(new File(dataDir));
158     } catch (IOException e) {
159       e.printStackTrace();
160     }
161   }
162   
163   
164   
165   public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFile() {
166     String tmpDir = System.getProperty("test.build.data", "/tmp");
167     long ts = System.currentTimeMillis();
168     String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
169 
170     Configuration conf = new Configuration();
171     conf.set("writer.hdfs.filesystem", "file:///");
172     conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
173     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
174     
175     
176     String cluster = "MyCluster_" + ts;
177     String machine = "machine_" + ts;
178     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
179     String recordType = "MyRecordType_" + ts;
180     
181     try {
182       FileSystem fs = FileSystem.getLocal(conf);
183       
184       File in1Dir = new File(dataDir + "/input");
185       in1Dir.mkdirs();
186       int lineCount = 1024*1024;//34MB
187       File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
188       long size = inputFile.length();
189       
190       String logFile = inputFile.getAbsolutePath();
191       System.out.println("Output:" + logFile);
192       System.out.println("File:" + inputFile.length());
193       BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
194       loader.process();
195       
196       File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
197       
198       Assert.assertTrue(inputFile.exists() == false);
199       Assert.assertTrue(finalOutputFile.exists() == true);
200       
201       String doneFile = null;
202       File directory = new File(dataDir  + "/log/");
203       String[] files = directory.list();
204       for(String file: files) {
205         if ( file.endsWith(".done") ){
206           doneFile = dataDir  + "/log/" + file;
207           break;
208         }
209       }
210       
211       long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
212           cluster, recordType,  machine, logFile);
213      
214       Assert.assertTrue(seqId == size);
215     } catch (Throwable e) {
216       e.printStackTrace();
217       Assert.fail();
218     }
219     try {
220       FileUtils.deleteDirectory(new File(dataDir));
221     } catch (IOException e) {
222       e.printStackTrace();
223     }
224   }
225   
226   
227   public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscapedBigFileLocalWriter() {
228     String tmpDir = System.getProperty("test.build.data", "/tmp");
229     long ts = System.currentTimeMillis();
230     String dataDir = tmpDir + "/TestBackfillingLoader_" + ts;
231 
232     Configuration conf = new Configuration();
233     conf.set("writer.hdfs.filesystem", "file:///");
234     conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
235     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
236     conf.set("chukwaCollector.localOutputDir", dataDir  + "/log/");
237     conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
238     conf.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on machines with full-ish disks
239 
240     String cluster = "MyCluster_" + ts;
241     String machine = "machine_" + ts;
242     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
243     String recordType = "MyRecordType_" + ts;
244     
245     try {
246       FileSystem fs = FileSystem.getLocal(conf);
247       
248       File in1Dir = new File(dataDir + "/input");
249       in1Dir.mkdirs();
250       int lineCount = 1024*1024*2;//64MB
251       File inputFile = makeTestFile(dataDir + "/input/in1.txt",lineCount);
252       long size = inputFile.length();
253       
254       String logFile = inputFile.getAbsolutePath();
255       System.out.println("Output:" + logFile);
256       System.out.println("File:" + inputFile.length());
257       BackfillingLoader loader = new BackfillingLoader(conf,cluster,machine,adaptorName,recordType,logFile);
258       loader.process();
259       
260       File finalOutputFile = new File(dataDir + "/input/in1.txt.sav");
261       
262       Assert.assertTrue(inputFile.exists() == false);
263       Assert.assertTrue(finalOutputFile.exists() == true);
264       
265       String doneFile = null;
266       File directory = new File(dataDir  + "/log/");
267       String[] files = directory.list();
268       for(String file: files) {
269         if ( file.endsWith(".done") ){
270           doneFile = dataDir  + "/log/" + file;
271           break;
272         }
273       }
274       
275       long seqId = validateDataSink(fs,conf,doneFile,finalOutputFile,
276           cluster, recordType,  machine, logFile);
277      
278       Assert.assertTrue(seqId == size);
279     } catch (Throwable e) {
280       e.printStackTrace();
281       Assert.fail();
282     }
283     try {
284       FileUtils.deleteDirectory(new File(dataDir));
285     } catch (IOException e) {
286       e.printStackTrace();
287     }
288   }
289   protected long validateDataSink(FileSystem fs,Configuration conf, String dataSinkFile, File logFile, 
290       String cluster,String dataType, String source, String application) throws Throwable {
291     SequenceFile.Reader reader = null;
292     long lastSeqId = -1;
293     BufferedWriter out = null;
294     try {
295       
296       reader = new SequenceFile.Reader(fs, new Path(dataSinkFile), conf);
297       ChukwaArchiveKey key = new ChukwaArchiveKey();
298       ChunkImpl chunk = ChunkImpl.getBlankChunk();
299 
300       String dataSinkDumpName = dataSinkFile + ".dump";
301       out = new BufferedWriter(new FileWriter(dataSinkDumpName));
302       
303 
304 
305       while (reader.next(key, chunk)) {
306         Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk)));
307         Assert.assertTrue(dataType.equals(chunk.getDataType()));
308         Assert.assertTrue(source.equals(chunk.getSource()));
309         
310         out.write(new String(chunk.getData()));
311         lastSeqId = chunk.getSeqID() ;
312       }
313       
314       out.close();
315       out = null;
316       reader.close();
317       reader = null;
318       
319       String dataSinkMD5 = MD5.checksum(new File(dataSinkDumpName));
320       String logFileMD5 = MD5.checksum(logFile);
321       Assert.assertTrue(dataSinkMD5.equals(logFileMD5));
322     }
323     finally {
324       if (out != null) {
325         out.close();
326       }
327       
328       if (reader != null) {
329         reader.close();
330       }
331     }
332    
333     
334     return lastSeqId;
335   }
336   
337   private File makeTestFile(String name, int size) throws IOException {
338     File tmpOutput = new File(name);
339     
340     FileOutputStream fos = new FileOutputStream(tmpOutput);
341 
342     PrintWriter pw = new PrintWriter(fos);
343     for (int i = 0; i < size; ++i) {
344       pw.print(i + " ");
345       pw.println("abcdefghijklmnopqrstuvwxyz");
346     }
347     pw.flush();
348     pw.close();
349     return tmpOutput;
350   }
351   
352 }