1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.util;
19
20 import junit.framework.TestCase;
21 import java.io.File;
22 import java.util.ArrayList;
23 import java.util.Calendar;
24 import java.util.List;
25 import java.io.IOException;
26
27 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28 import org.apache.hadoop.chukwa.ChunkImpl;
29 import org.apache.hadoop.chukwa.util.CopySequenceFile;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.io.SequenceFile;
35
36 public class TestCopySequenceFile extends TestCase {
37 File doneFile = null;
38 File tempDir = null;
39 String tempFileName = null;
40
41 public void testCopySequenceFile() throws IOException {
42
43
44 tempDir = new File(System.getProperty("test.build.data", "/tmp"));
45 File tempFile = File.createTempFile("testcopy", ".chukwa", tempDir);
46 tempFile.deleteOnExit();
47 tempFileName=tempFile.getName();
48 Configuration conf = new Configuration();
49 Path path = new Path(tempFile.getAbsolutePath());
50 List<ChunkImpl> chunks = new ArrayList<ChunkImpl>();
51 byte[] dat = "test".getBytes();
52
53 ChunkImpl c = new ChunkImpl("Data", "aname", dat.length, dat, null);
54 chunks.add(c);
55
56 dat = "ing".getBytes();
57 c = new ChunkImpl("Data", "aname", dat.length+4, dat, null);
58 chunks.add(c);
59
60
61
62 writeSeqFile(conf, FileSystem.getLocal(conf), path, chunks);
63
64
65
66 CopySequenceFile.createValidSequenceFile(conf, tempDir.getAbsolutePath(), tempFile.getName(), FileSystem.getLocal(conf));
67
68
69
70 assertFalse("File " + tempFile.getAbsolutePath() + " has not been deleted", tempFile.exists()) ;
71
72 String doneFilePath= tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".done");
73 doneFile= new File(doneFilePath);
74
75
76
77 assertTrue("File " + doneFilePath + " has not been created", doneFile.exists());
78
79 }
80 public static void writeSeqFile(Configuration conf, FileSystem fileSys, Path dest,
81 List<ChunkImpl> chunks) throws IOException {
82 FSDataOutputStream out = fileSys.create(dest);
83
84 Calendar calendar = Calendar.getInstance();
85 SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
86 ChukwaArchiveKey.class, ChunkImpl.class,
87 SequenceFile.CompressionType.NONE, null);
88
89 for (ChunkImpl chunk: chunks) {
90 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
91
92 calendar.set(Calendar.SECOND, 0);
93 calendar.set(Calendar.MILLISECOND, 0);
94 archiveKey.setTimePartition(calendar.getTimeInMillis());
95
96 archiveKey.setDataType(chunk.getDataType());
97 archiveKey.setStreamName(chunk.getStreamName());
98 archiveKey.setSeqId(chunk.getSeqID());
99 seqFileWriter.append(archiveKey, chunk);
100 }
101 seqFileWriter.close();
102 out.close();
103 }
104 protected void tearDown() {
105 if (doneFile != null && doneFile.exists()){
106 doneFile.delete();
107 } else {
108 File recoverFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".recover"));
109 if (recoverFile.exists()){
110 recoverFile.delete();
111 } else {
112 File recoverDoneFile = new File(tempDir.getAbsolutePath()+"/"+tempFileName.replace(".chukwa", ".recoverDone"));
113 if (recoverDoneFile.exists()){
114 recoverDoneFile.delete();
115 }
116 }
117 }
118 }
119 }