1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.archive;
19
20 import java.io.IOException;
21 import java.util.Calendar;
22 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
23 import org.apache.hadoop.chukwa.ChunkImpl;
24 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FSDataOutputStream;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hdfs.MiniDFSCluster;
31 import org.apache.hadoop.io.SequenceFile;
32 import org.apache.hadoop.mapred.*;
33 import org.apache.hadoop.util.ToolRunner;
34 import junit.framework.TestCase;
35 import static org.apache.hadoop.chukwa.util.TempFileUtil.writeASinkFile;
36
37 public class TestArchive extends TestCase {
38
39
40 public void browseDir(FileSystem fs, Path p, int d) throws IOException {
41 for(int i=0; i< d; ++i) {
42 System.out.print(" |");
43 }
44 FileStatus stat = fs.getFileStatus(p);
45 if(stat.isDir()) {
46 System.out.println(" \\ " + p.getName());
47 FileStatus[] files = fs.listStatus(p);
48 for(FileStatus f: files) {
49 browseDir(fs, f.getPath(), d+1);
50 }
51 }
52 else
53 System.out.println( p.getName() );
54 }
55
56 static final int NUM_HADOOP_SLAVES = 1;
57 static final Path DATASINK = new Path("/chukwa/logs/*");
58 static final Path DATASINKFILE = new Path("/chukwa/logs/foobar.done");
59 static final Path DATASINK_NOTDONE = new Path("/chukwa/logs/foo.chukwa");
60 static final Path DEST_FILE = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29.arc");
61 static final Path MERGED_DATASINK = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29-0.arc");
62 static final Path OUTPUT_DIR = new Path("/chukwa/archive/");
63 static final int CHUNKCOUNT = 1000;
64
65
66
67
68
69
70
71 public void testArchiving() throws Exception {
72 FileSystem fileSys;
73 MiniMRCluster mr;
74 JobConf jc ;
75
76 System.out.println("starting archive test");
77 Configuration conf = new Configuration();
78
79 conf.setInt("io.sort.mb", 1);
80 conf.setInt("io.sort.factor", 5);
81 conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
82 conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
83 conf.set(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME, "true");
84
85 System.setProperty("hadoop.log.dir", System.getProperty(
86 "test.build.data", "/tmp"));
87
88 MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
89 null);
90 fileSys = dfs.getFileSystem();
91 conf.set("fs.default.name", fileSys.getUri().toString());
92
93 System.out.println("filesystem is " + fileSys.getUri());
94
95
96 mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
97 .toString(), 1);
98 jc = mr.createJobConf(new JobConf(conf));
99
100
101 fileSys.delete(new Path("/chukwa"), true);
102
103 writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
104
105 FileStatus fstat = fileSys.getFileStatus(DATASINKFILE);
106 long dataLen = fstat.getLen();
107 assertTrue(dataLen > CHUNKCOUNT * 50);
108
109 String[] archiveArgs = {"DataType", fileSys.getUri().toString() + DATASINK.toString(),
110 fileSys.getUri().toString() +OUTPUT_DIR.toString() };
111
112 assertEquals("true", jc.get("archive.groupByClusterName"));
113 assertEquals(1, jc.getInt("io.sort.mb", 5));
114
115 int returnVal = ToolRunner.run(jc, new ChukwaArchiveBuilder(), archiveArgs);
116 assertEquals(0, returnVal);
117 fstat = fileSys.getFileStatus(DEST_FILE);
118 assertEquals(dataLen, fstat.getLen());
119
120 Thread.sleep(1000);
121
122 SinkArchiver a = new SinkArchiver();
123 fileSys.delete(new Path("/chukwa"), true);
124
125 writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
126 writeASinkFile(jc, fileSys, DATASINK_NOTDONE, 50);
127 writeASinkFile(jc, fileSys, DEST_FILE, 10);
128
129 long doneLen = fileSys.getFileStatus(DATASINKFILE).getLen();
130 long notDoneLen = fileSys.getFileStatus(DATASINK_NOTDONE).getLen();
131 long archFileLen = fileSys.getFileStatus(DEST_FILE).getLen();
132
133
134
135
136
137 a.exec(fileSys, jc);
138
139 browseDir(fileSys, new Path("/"), 0);
140
141
142 assertEquals(notDoneLen, fileSys.getFileStatus(DATASINK_NOTDONE).getLen());
143 assertEquals(archFileLen, fileSys.getFileStatus(DEST_FILE).getLen());
144
145
146 assertEquals(doneLen, fileSys.getFileStatus(MERGED_DATASINK).getLen());
147 mr.shutdown();
148 dfs.shutdown();
149
150 }
151
152 }