View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.util;
19  
20  import junit.framework.TestCase;
21  import java.io.File;
22  import java.util.ArrayList;
23  import java.util.Calendar;
24  import java.util.List;
25  import java.io.IOException;
26  
27  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28  import org.apache.hadoop.chukwa.ChunkImpl;
29  import org.apache.hadoop.chukwa.util.CopySequenceFile;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.io.SequenceFile;
35  
36  public class TestCopySequenceFile extends TestCase {
37    File doneFile = null;
38    File tempDir = null;
39    String tempFileName = null;
40  	
41    public void testCopySequenceFile() throws IOException {
42      //Create a .chukwa sequence file 
43  	  
44      tempDir = new File(System.getProperty("test.build.data", "/tmp"));
45      File tempFile = File.createTempFile("testcopy", ".chukwa", tempDir); 
46      tempFile.deleteOnExit(); // Will delete this file if test fails and file is not renamed to .done
47      tempFileName=tempFile.getName();
48      Configuration conf = new Configuration();
49      Path path = new Path(tempFile.getAbsolutePath());
50      List<ChunkImpl> chunks = new ArrayList<ChunkImpl>();
51      byte[] dat = "test".getBytes();
52      
53      ChunkImpl c = new ChunkImpl("Data", "aname", dat.length, dat, null);
54      chunks.add(c);
55      
56      dat = "ing".getBytes();
57      c = new ChunkImpl("Data", "aname", dat.length+4, dat, null);
58      chunks.add(c);
59      
60      //Utilize the writeSeqFile method to create a valid .chukwa sequence file
61      
62      writeSeqFile(conf, FileSystem.getLocal(conf), path, chunks);
63      
64      //Call CopySequenceFile to convert .chukwa to .done
65      
66      CopySequenceFile.createValidSequenceFile(conf, tempDir.getAbsolutePath(), tempFile.getName(), FileSystem.getLocal(conf));
67  	
68      //Assert that the chukwa file has been deleted
69      
70  	assertFalse("File " + tempFile.getAbsolutePath() + " has not been deleted", tempFile.exists()) ; 
71  	
72  	String doneFilePath= tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".done");
73  	doneFile= new File(doneFilePath);
74  	
75  	//Assert that the done file has been created
76  	
77      assertTrue("File " + doneFilePath + " has not been created", doneFile.exists()); 
78  		
79    }
80    public static void writeSeqFile(Configuration conf, FileSystem fileSys, Path dest,
81  	                              List<ChunkImpl> chunks) throws IOException {
82      FSDataOutputStream out = fileSys.create(dest);
83  
84  	Calendar calendar = Calendar.getInstance();
85  	SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
86  	                                    ChukwaArchiveKey.class, ChunkImpl.class,
87  	                                    SequenceFile.CompressionType.NONE, null);
88  	    
89  	for (ChunkImpl chunk: chunks) {
90  	  ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
91  	      
92        calendar.set(Calendar.SECOND, 0);
93        calendar.set(Calendar.MILLISECOND, 0);
94        archiveKey.setTimePartition(calendar.getTimeInMillis());
95        
96        archiveKey.setDataType(chunk.getDataType());
97        archiveKey.setStreamName(chunk.getStreamName());
98        archiveKey.setSeqId(chunk.getSeqID());
99        seqFileWriter.append(archiveKey, chunk);
100 	}
101     seqFileWriter.close();
102     out.close();
103   }
104   protected void tearDown() {
105 	if (doneFile != null && doneFile.exists()){
106 		  doneFile.delete();
107 		} else { //Cleanup any files that may have been created during a failed copy attempt 
108 			File recoverFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".recover"));
109 			if (recoverFile.exists()){
110 			  recoverFile.delete();
111 			} else {
112 			    File recoverDoneFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".recoverDone"));
113 			    if (recoverDoneFile.exists()){
114 			      recoverDoneFile.delete();
115 			    }
116 			  }
117 		 }
118   }
119 }