View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  import java.io.File;
21  import java.util.LinkedList;
22  import java.util.List;
23  import java.util.Date;
24  import java.text.SimpleDateFormat;
25  
26  import junit.framework.Assert;
27  import junit.framework.TestCase;
28  
29  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
30  import org.apache.hadoop.chukwa.Chunk;
31  import org.apache.hadoop.chukwa.ChunkBuilder;
32  import org.apache.hadoop.chukwa.ChunkImpl;
33  import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.io.SequenceFile;
38  
39  
40  
41  
42  public class TestChukwaWriters extends TestCase{
43  
44    public void testWriters() {
45      try {
46        
47        Configuration conf = new Configuration();
48        FileSystem fs = FileSystem.getLocal(conf);
49  
50        
51        ChukwaWriter seqWriter = new SeqFileWriter();
52        ChukwaWriter localWriter = new LocalWriter();
53        
54        List<Chunk> chunksSeqWriter = new LinkedList<Chunk>();
55        List<Chunk> chunksLocalWriter = new LinkedList<Chunk>();
56        for(int i=0;i<10;i++) {
57          ChunkBuilder cb1 = new ChunkBuilder();
58          cb1.addRecord(("record-" +i) .getBytes());
59          cb1.addRecord("foo" .getBytes());
60          cb1.addRecord("bar".getBytes());
61          cb1.addRecord("baz".getBytes());
62          chunksSeqWriter.add(cb1.getChunk());
63          
64          ChunkBuilder cb2 = new ChunkBuilder();
65          cb2.addRecord(("record-" +i) .getBytes());
66          cb2.addRecord("foo" .getBytes());
67          cb2.addRecord("bar".getBytes());
68          cb2.addRecord("baz".getBytes());
69          chunksLocalWriter.add(cb2.getChunk());
70          
71        }
72        
73        File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
74        if (!tempDir.exists()) {
75          tempDir.mkdirs();
76        }
77        
78        String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testWriters_" + System.currentTimeMillis() + "/";
79        
80        
81        Configuration confSeqWriter = new Configuration();
82        confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
83        confSeqWriter.set("writer.hdfs.filesystem", "file:///");
84        String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
85        confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
86        
87        seqWriter.init(confSeqWriter);
88        Thread.sleep(5000);
89        seqWriter.add(chunksSeqWriter);
90        seqWriter.close();
91        
92        String seqWriterFile = null;
93        
94        File directory = new File(seqWriterOutputDir);
95        String[] files = directory.list();
96        for(String file: files) {
97          if ( file.endsWith(".done") ){
98            seqWriterFile = seqWriterOutputDir + File.separator + file;
99            break;
100         }
101       }
102       
103       Assert.assertFalse(seqWriterFile == null);
104       
105       String seqWriterDump = dumpArchive(fs,conf,seqWriterFile);
106       
107       Configuration confLocalWriter = new Configuration();
108       confSeqWriter.set("writer.hdfs.filesystem", "file:///");
109       String localWriterOutputDir = outputDirectory +"/localWriter/localOutputDir";
110       confLocalWriter.set("chukwaCollector.localOutputDir",localWriterOutputDir);
111       confLocalWriter.set("chukwaCollector.rotateInterval", "300000");
112       confLocalWriter.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on 
113       //machines with mostly-full disks
114 
115       
116       String localWriterFile = null;
117       localWriter.init(confLocalWriter);
118       Thread.sleep(5000);
119       localWriter.add(chunksLocalWriter);
120       localWriter.close();
121 
122       directory = new File(localWriterOutputDir);
123       files = directory.list();
124       for(String file: files) {
125         if ( file.endsWith(".done") ){
126           localWriterFile = localWriterOutputDir + File.separator + file;
127           break;
128         }
129       }
130       
131       Assert.assertFalse(localWriterFile == null);
132       String localWriterDump = dumpArchive(fs,conf,localWriterFile);
133 
134       Assert.assertTrue(seqWriterDump.intern() == localWriterDump.intern());
135 
136       File fOutputDirectory = new File(outputDirectory);
137       fOutputDirectory.delete();
138     } catch (Throwable e) {
139       e.printStackTrace();
140       Assert.fail("Exception in TestChukwaWriters," + e.getMessage());
141     }
142     
143   }
144   
145   protected String dumpArchive(FileSystem fs,Configuration conf, String file) throws Throwable {
146     SequenceFile.Reader reader = null;
147     try {
148       reader = new SequenceFile.Reader(fs, new Path(file), conf);
149 
150       ChukwaArchiveKey key = new ChukwaArchiveKey();
151       ChunkImpl chunk = ChunkImpl.getBlankChunk();
152 
153       StringBuilder sb = new StringBuilder();
154       while (reader.next(key, chunk)) {
155         sb.append("\nTimePartition: " + key.getTimePartition());
156         sb.append("DataType: " + key.getDataType());
157         sb.append("StreamName: " + key.getStreamName());
158         sb.append("SeqId: " + key.getSeqId());
159         sb.append("\t\t =============== ");
160 
161         sb.append("Cluster : " + chunk.getTags());
162         sb.append("DataType : " + chunk.getDataType());
163         sb.append("Source : " + chunk.getSource());
164         sb.append("Application : " + chunk.getStreamName());
165         sb.append("SeqID : " + chunk.getSeqID());
166         sb.append("Data : " + new String(chunk.getData()));
167         return sb.toString();
168       }
169     } catch (Throwable e) {
170      Assert.fail("Exception while reading SeqFile"+ e.getMessage());
171      throw e;
172     }
173     
174     finally {
175       if (reader != null) {
176         reader.close();
177       }
178     }
179     return null;    
180   }
181 
182   /**
183    * Test to check if the .chukwa files are closing at the time we expect them
184    * to close. This test sets the rotateInterval and offsetInterval to small
185    * values, reads the filename of the first .chukwa file, extracts the
186    * timestamp from its name, calculates the timestamp when the next .chukwa
187    * file should be closed, sleeps for some time (enough for producing the next
188    * .chukwa file), reads the timestamp on the second .chukwa file, and
189    * compares the expected close timestamp with the actual closing timestamp of
190    * the second file.
191    */
192   public void testSeqWriterFixedCloseInterval() {
193     try {
194       long rotateInterval = 10000;
195       long intervalOffset = 3000;
196 
197       ChukwaWriter seqWriter = new SeqFileWriter();
198 
199       File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
200       if (!tempDir.exists()) {
201         tempDir.mkdirs();
202       }
203 
204       String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testSeqWriterFixedCloseInterval_" +
205               System.currentTimeMillis() + "/";
206 
207       Configuration confSeqWriter = new Configuration();
208       confSeqWriter.set("chukwaCollector.rotateInterval", String.valueOf(rotateInterval));
209       confSeqWriter.set("writer.hdfs.filesystem", "file:///");
210       String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
211       confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
212       confSeqWriter.set("chukwaCollector.isFixedTimeRotatorScheme", "true");
213       confSeqWriter.set("chukwaCollector.fixedTimeIntervalOffset", String.valueOf(intervalOffset));
214 
215       File directory = new File(seqWriterOutputDir);
216 
217       // if some files already exist in this directory then delete them. Files
218       // may exist due to an old test run.
219       File[] files = directory.listFiles();
220       if (files != null) {
221         for(File file: files) {
222           file.delete();
223         }
224       }
225 
226       // we do not want our test to fail due to a lag in calling the
227       // scheduleNextRotation() method and creating of first .chukwa file.
228       // So, we will make sure that the rotation starts in the middle (approx)
229       // of the rotateInterval
230       long currentTime = System.currentTimeMillis();
231       long currentTimeInSec = currentTime/1000;
232       long timeAfterPrevRotateInterval = currentTimeInSec % rotateInterval;
233       if(timeAfterPrevRotateInterval > (rotateInterval - 2)){
234         Thread.sleep(2000);
235       }
236 
237       seqWriter.init(confSeqWriter);
238       String [] fileNames = directory.list();
239       String firstFileName = "";
240       String initialTimestamp = "";
241       // extracting the close time of first .chukwa file. This timestamp can be
242       // extracted from the file name. An example filename is
243       // 20110531122600002_<host-name>_5f836ece1302899d9a0727e.chukwa
244       for(String file: fileNames) {
245         if ( file.endsWith(".chukwa") ){
246           // set a flag so that later we can identify that this file has been
247           // visited
248           firstFileName = file;
249           // getting just the timestamp part i.e. 20110531122600002 in the
250           // example filename mentioned in the above comment
251           initialTimestamp = file.split("_")[0];
252           // stripping off the millisecond part of timestamp. The timestamp
253           // now becomes 20110531122600
254           initialTimestamp = initialTimestamp.substring(0, initialTimestamp.length()-3);
255           break;
256         }
257       }
258 
259       SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddhhmmss");
260       Date initialDate = formatter.parse(initialTimestamp);
261 	    long initialDateInMillis = initialDate.getTime();
262 
263       // calculate the expected close time of the next .chukwa file.
264       long prevRoundedInterval = initialDateInMillis - (initialDateInMillis %
265               rotateInterval);
266       long expectedNextCloseDate = prevRoundedInterval +
267               rotateInterval + intervalOffset;
268 
269       // sleep for a time interval equal to (rotateInterval + offsetInterval).
270       // Only one more .chukwa file will be will be produced in this time
271       // interval.
272       long sleepTime = rotateInterval + intervalOffset;
273 
274       Thread.sleep(sleepTime);
275       fileNames = directory.list();
276       String nextTimestamp = "";
277       // extract the timestamp of the second .chukwa file
278       for(String file: fileNames) {
279         if ( file.endsWith(".chukwa") && !file.equals(firstFileName)){
280           nextTimestamp = file.split("_")[0];
281           nextTimestamp = nextTimestamp.substring(0, nextTimestamp.length()-3);
282           break;
283         }
284       }
285 
286       Date nextDate = formatter.parse(nextTimestamp);
287       long nextDateInMillis = nextDate.getTime();
288 
289       long threshold = 500; //milliseconds
290 
291       // test will be successful only if the timestamp on the second .chukwa
292       // file is very close (differs by < 500 ms) to the expected closing
293       // timestamp we calculated.
294       Assert.assertTrue("File not closed at expected time",
295               (nextDateInMillis - expectedNextCloseDate < threshold));
296       seqWriter.close();
297 
298     } catch (Throwable e) {
299       e.printStackTrace();
300       Assert.fail("Exception in TestChukwaWriters - " +
301               "testSeqFileFixedCloseInterval()," + e.getMessage());
302     }
303 }
304 
305   /**
306    * Test to check the calculation of the delay interval for rotation in
307    * SeqFileWriter. It uses an array of known currentTimestamps and their
308    * corresponding expectedRotateTimestamps (the next timestamp when the
309    * rotation should happen). The actual timestamp of next rotation is
310    * calculated by adding delay (obtained from getDelayForFixedInterval()) to
311    * the currentTimestamp.
312    */
313   public void testFixedIntervalOffsetCalculation(){
314     try{
315       SeqFileWriter seqFileWriter = new SeqFileWriter();
316       SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
317 
318       //rotateInterval >> offsetInterval
319       long rotateInterval = 300000; //5 min
320       long offsetInterval = 60000;  //1 min
321       long currentTimestamps[] = new long[5] ;
322       long expectedRotateTimestamps[] = new long[5];
323 
324       Date date = formatter.parse("2011/06/15 01:05:00");
325 	    currentTimestamps[0] = date.getTime();
326       expectedRotateTimestamps[0] = 1308125460000L; //2011/06/15 01:11:00
327 
328       date = formatter.parse("2011/06/15 01:06:00");
329 	    currentTimestamps[1] = date.getTime();
330       expectedRotateTimestamps[1] = 1308125460000L; //2011/06/15 01:11:00
331 
332       date = formatter.parse("2011/06/15 01:02:00");
333       currentTimestamps[2] = date.getTime();
334       expectedRotateTimestamps[2] = 1308125160000L; //2011/06/15 01:06:00
335 
336       date = formatter.parse("2011/06/15 01:04:00");
337       currentTimestamps[3] = date.getTime();
338       expectedRotateTimestamps[3] = 1308125160000L; //2011/06/15 01:06:00
339 
340       //edge case, when there is a change in the "hour"
341       date = formatter.parse("2011/06/15 01:56:00");
342       currentTimestamps[4] = date.getTime();
343       expectedRotateTimestamps[4] = 1308128460000L; //2011/06/15 02:01:00
344 
345       int i=0;
346       long expectedDelay = 0;
347       long actualRotateTimestamp = 0;
348       for(; i<5; i++){
349         expectedDelay = seqFileWriter.getDelayForFixedInterval(
350                 currentTimestamps[i], rotateInterval, offsetInterval);
351         actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
352         Assert.assertTrue("Incorrect value for delay",
353                 (actualRotateTimestamp==expectedRotateTimestamps[i]));
354       }
355 
356       //rotateInterval > offsetInterval
357       rotateInterval = 60000; //1 min
358       offsetInterval = 30000; //30 sec
359 
360       date = formatter.parse("2011/06/15 01:05:00");
361 	    currentTimestamps[0] = date.getTime();
362       expectedRotateTimestamps[0] = 1308125190000L; //2011/06/15 01:06:30
363 
364       date = formatter.parse("2011/06/15 01:04:30");
365 	    currentTimestamps[1] = date.getTime();
366       expectedRotateTimestamps[1] = 1308125130000L; //2011/06/15 01:05:30
367 
368       date = formatter.parse("2011/06/15 01:05:30");
369       currentTimestamps[2] = date.getTime();
370       expectedRotateTimestamps[2] = 1308125190000L; //2011/06/15 01:06:30
371 
372       date = formatter.parse("2011/06/15 01:04:00");
373       currentTimestamps[3] = date.getTime();
374       expectedRotateTimestamps[3] = 1308125130000L; //2011/06/15 01:05:30
375 
376       //edge case, when there is a change in the "hour"
377       date = formatter.parse("2011/06/15 01:59:30");
378       currentTimestamps[4] = date.getTime();
379       expectedRotateTimestamps[4] = 1308128430000L; //2011/06/15 02:00:30
380 
381       for(i=0; i<5; i++){
382         expectedDelay = seqFileWriter.getDelayForFixedInterval(
383                 currentTimestamps[i], rotateInterval, offsetInterval);
384         actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
385         Assert.assertTrue("Incorrect value for delay",
386                 (actualRotateTimestamp==expectedRotateTimestamps[i]));
387       }
388 
389       //rotateInterval = offsetInterval
390       rotateInterval = 60000; //1 min
391       offsetInterval = 60000; //1 min
392 
393       date = formatter.parse("2011/06/15 01:02:00");
394       currentTimestamps[0] = date.getTime();
395       expectedRotateTimestamps[0] = 1308125040000L; //2011/06/15 01:04:00
396 
397       date = formatter.parse("2011/06/15 01:02:30");
398       currentTimestamps[1] = date.getTime();
399       expectedRotateTimestamps[1] = 1308125040000L; //2011/06/15 01:04:00
400 
401       //edge case, when there is a change in the "hour"
402       date = formatter.parse("2011/06/15 01:59:30");
403       currentTimestamps[2] = date.getTime();
404       expectedRotateTimestamps[2] = 1308128460000L; //2011/06/15 02:01:00
405 
406       for(i=0; i<3; i++){
407         expectedDelay = seqFileWriter.getDelayForFixedInterval(
408                 currentTimestamps[i], rotateInterval, offsetInterval);
409         actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
410         Assert.assertTrue("Incorrect value for delay",
411                 (actualRotateTimestamp==expectedRotateTimestamps[i]));
412       }
413 
414       //rotateInterval < offsetInterval
415       rotateInterval = 60000; //1 min
416       offsetInterval = 120000; //2 min
417 
418       date = formatter.parse("2011/06/15 01:02:00");
419       currentTimestamps[0] = date.getTime();
420       expectedRotateTimestamps[0] = 1308125100000L; //2011/06/15 01:05:00
421 
422       date = formatter.parse("2011/06/15 01:02:30");
423       currentTimestamps[1] = date.getTime();
424       expectedRotateTimestamps[1] = 1308125100000L; //2011/06/15 01:05:00
425 
426       //edge case, when there is a change in the "hour"
427       date = formatter.parse("2011/06/15 01:59:30");
428       currentTimestamps[2] = date.getTime();
429       expectedRotateTimestamps[2] = 1308128520000L; //2011/06/15 02:02:00
430 
431       for(i=0; i<3; i++){
432         expectedDelay = seqFileWriter.getDelayForFixedInterval(
433                 currentTimestamps[i], rotateInterval, offsetInterval);
434         actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
435         Assert.assertTrue("Incorrect value for delay",
436                 (actualRotateTimestamp==expectedRotateTimestamps[i]));
437       }
438 
439     } catch (Throwable e) {
440       e.printStackTrace();
441       Assert.fail("Exception in TestChukwaWriters - " +
442               "testFixedIntervalOffsetCalculation()," + e.getMessage());
443     }
444   }
445 }