View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.archive;
19  
20  import java.io.IOException;
21  import java.util.Calendar;
22  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
23  import org.apache.hadoop.chukwa.ChunkImpl;
24  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FSDataOutputStream;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hdfs.MiniDFSCluster;
31  import org.apache.hadoop.io.SequenceFile;
32  import org.apache.hadoop.mapred.*;
33  import org.apache.hadoop.util.ToolRunner;
34  import junit.framework.TestCase;
35  import static org.apache.hadoop.chukwa.util.TempFileUtil.writeASinkFile;
36  
37  public class TestArchive extends TestCase {
38  
39    
40     public void browseDir(FileSystem fs, Path p, int d) throws IOException {
41       for(int i=0; i< d; ++i) {
42         System.out.print(" |");
43       }
44       FileStatus stat = fs.getFileStatus(p);
45       if(stat.isDir()) {
46         System.out.println(" \\ " + p.getName());
47         FileStatus[] files = fs.listStatus(p);
48         for(FileStatus f: files) {
49           browseDir(fs, f.getPath(), d+1);
50         }
51       }
52       else
53         System.out.println( p.getName() );
54     }
55  
56    static final int NUM_HADOOP_SLAVES = 1;
57    static final Path DATASINK = new Path("/chukwa/logs/*");
58    static final Path DATASINKFILE = new Path("/chukwa/logs/foobar.done");
59    static final Path DATASINK_NOTDONE = new Path("/chukwa/logs/foo.chukwa");
60    static final Path DEST_FILE = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29.arc");
61    static final Path MERGED_DATASINK = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29-0.arc");
62    static final Path OUTPUT_DIR = new Path("/chukwa/archive/");
63    static final int CHUNKCOUNT = 1000;
64    
65  
66    
67    /**
68     * Writes a single chunk to a file, checks that archiver delivers it
69     * to an archive file with correct filename.
70     */
71    public void testArchiving() throws Exception {
72      FileSystem fileSys;
73      MiniMRCluster mr;
74      JobConf jc ;
75      
76      System.out.println("starting archive test");
77      Configuration conf = new Configuration();
78      
79      conf.setInt("io.sort.mb", 1);
80      conf.setInt("io.sort.factor", 5);
81      conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
82      conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
83      conf.set(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME, "true");
84      
85      System.setProperty("hadoop.log.dir", System.getProperty(
86          "test.build.data", "/tmp"));
87  
88      MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
89          null);
90      fileSys = dfs.getFileSystem();
91      conf.set("fs.default.name", fileSys.getUri().toString());
92      
93      System.out.println("filesystem is " + fileSys.getUri());
94  
95      
96      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
97          .toString(), 1);
98      jc = mr.createJobConf(new JobConf(conf));
99     
100 
101     fileSys.delete(new Path("/chukwa"), true);//nuke sink
102 
103     writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
104     
105     FileStatus fstat = fileSys.getFileStatus(DATASINKFILE);
106     long dataLen = fstat.getLen();
107     assertTrue(dataLen > CHUNKCOUNT * 50);
108     
109     String[] archiveArgs = {"DataType", fileSys.getUri().toString() + DATASINK.toString(),
110         fileSys.getUri().toString() +OUTPUT_DIR.toString() };
111     
112     assertEquals("true", jc.get("archive.groupByClusterName"));
113     assertEquals(1, jc.getInt("io.sort.mb", 5));
114     
115     int returnVal = ToolRunner.run(jc,  new ChukwaArchiveBuilder(), archiveArgs);
116     assertEquals(0, returnVal);
117     fstat = fileSys.getFileStatus(DEST_FILE);
118     assertEquals(dataLen, fstat.getLen());    
119     
120     Thread.sleep(1000);
121     
122     SinkArchiver a = new SinkArchiver();
123     fileSys.delete(new Path("/chukwa"), true);
124 
125     writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
126     writeASinkFile(jc, fileSys, DATASINK_NOTDONE, 50);
127     writeASinkFile(jc, fileSys, DEST_FILE, 10);
128     
129     long doneLen = fileSys.getFileStatus(DATASINKFILE).getLen();
130     long notDoneLen = fileSys.getFileStatus(DATASINK_NOTDONE).getLen();
131     long archFileLen = fileSys.getFileStatus(DEST_FILE).getLen();
132 
133     //we now have three files: one closed datasink, one "unfinished" datasink,
134     //and one archived.  After merge, should have two datasink files,
135     //plus the "unfinished" datasink
136     
137     a.exec(fileSys, jc);
138 
139     browseDir(fileSys, new Path("/"), 0);    //OUTPUT_DIR, 0);
140     
141       //make sure we don't scramble anything
142     assertEquals(notDoneLen, fileSys.getFileStatus(DATASINK_NOTDONE).getLen());
143     assertEquals(archFileLen, fileSys.getFileStatus(DEST_FILE).getLen());
144     //and make sure promotion worked right
145 
146     assertEquals(doneLen, fileSys.getFileStatus(MERGED_DATASINK).getLen());
147     mr.shutdown();
148     dfs.shutdown();
149     
150   }
151   
152 }