View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.util.Map;
24  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
25  import org.apache.hadoop.conf.*;
26  import org.apache.log4j.Level;
27  import org.apache.log4j.Logger;
28  import junit.framework.TestCase;
29  
30  public class TestDirTailingAdaptor extends TestCase {
31  
32    ChukwaAgent agent;
33    File baseDir;
34    static final int SCAN_INTERVAL = 1000;
35    
36    /**
37     * This test is exactly the same as testDirTailed except that it applies filtering and<br/>
38     * creates a file that should not be read inorder to test the filter.
39     * @throws IOException
40     * @throws ChukwaAgent.AlreadyRunningException
41     * @throws InterruptedException
42     */
43    public void testDirTailerFiltering() throws IOException,
44    ChukwaAgent.AlreadyRunningException, InterruptedException {
45      
46      DirTailingAdaptor.log.setLevel(Level.DEBUG);
47      
48      Configuration conf = new Configuration();
49      baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
50      File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
51      createEmptyDir(checkpointDir);
52      
53      conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
54      conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
55      conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
56      conf.setInt("chukwaAgent.control.port", 0);
57      conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
58      
59      agent = new ChukwaAgent(conf);
60      File emptyDir = new File(baseDir, "emptyDir2");
61      createEmptyDir(emptyDir);
62      
63      assertEquals(0, agent.adaptorCount());
64      //check filtering with empty directory
65      agent.processAddCommand("add emptydir2= DirTailingAdaptor raw " + emptyDir + " *file filetailer.CharFileTailingAdaptorUTF8 0");
66      assertEquals(1, agent.adaptorCount());
67  
68      File dirWithFile = new File(baseDir, "dir3");
69      dirWithFile.delete();
70      assertFalse("temp directory not empty",dirWithFile.exists());
71        
72      //this file should be found by the filter
73      dirWithFile.mkdir();
74      File inDir = File.createTempFile("atemp", "file", dirWithFile);
75      inDir.deleteOnExit();
76      //This file should not be found by the filter
77      File noreadFile = File.createTempFile("atemp", "noread", dirWithFile);
78      noreadFile.deleteOnExit();
79      
80      //apply filter *file
81      agent.processAddCommand("add dir3= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
82      Thread.sleep(3000);
83      assertEquals(3, agent.adaptorCount());
84      agent.shutdown();
85  
86      conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
87      Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
88      File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
89      File aNewFile = File.createTempFile("new", "file", dirWithFile);
90      anOldFile.deleteOnExit();
91      aNewFile.deleteOnExit();
92      anOldFile.setLastModified(10);//just after epoch
93      agent = new ChukwaAgent(conf); //restart agent.
94      
95     Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
96     assertTrue(aNewFile.exists());
97     
98      //make sure we started tailing the new, not the old, file.
99      for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
100       System.out.println(adaptors.getKey() +": " + adaptors.getValue());
101       assertFalse(adaptors.getValue().contains("oldXYZ"));
102     }
103     Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
104     //should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
105     //and FileTailers for File inDir and file newfile and not the noread file.
106     assertEquals(4, agent.adaptorCount());
107     agent.shutdown();
108     Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
109     
110     nukeDirContents(checkpointDir);//nuke dir
111     checkpointDir.delete();
112     emptyDir.delete();
113     nukeDirContents(dirWithFile);
114     dirWithFile.delete();
115   }
116 
117   
118   public void testDirTailer() throws IOException,
119   ChukwaAgent.AlreadyRunningException, InterruptedException {
120     
121     DirTailingAdaptor.log.setLevel(Level.DEBUG);
122     
123     Configuration conf = new Configuration();
124     baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
125     File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
126     createEmptyDir(checkpointDir);
127     
128     conf.setInt("adaptor.dirscan.intervalMs", SCAN_INTERVAL);
129     conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
130     conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
131     conf.setInt("chukwaAgent.control.port", 0);
132     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
133    
134     boolean retry = true; 
135     while(retry) {
136       try {
137         retry = false;
138         agent = new ChukwaAgent(conf);
139       } catch(Exception e) {
140         retry = true;
141       }
142     }
143     File emptyDir = new File(baseDir, "emptyDir");
144     createEmptyDir(emptyDir);
145     
146     assertEquals(0, agent.adaptorCount());
147     agent.processAddCommand("add emptydir= DirTailingAdaptor raw " + emptyDir + " filetailer.CharFileTailingAdaptorUTF8 0");
148     assertEquals(1, agent.adaptorCount());
149 
150     File dirWithFile = new File(baseDir, "dir2");
151     dirWithFile.delete();
152     assertFalse("temp directory not empty",dirWithFile.exists());
153       
154     dirWithFile.mkdir();
155     File inDir = File.createTempFile("atemp", "file", dirWithFile);
156     inDir.deleteOnExit();
157     agent.processAddCommand("add dir2= DirTailingAdaptor raw " + dirWithFile + " *file filetailer.CharFileTailingAdaptorUTF8 0");
158     Thread.sleep(3000);
159     assertEquals(3, agent.adaptorCount());
160     System.out.println("DirTailingAdaptor looks OK before restart");
161     agent.shutdown();
162 
163     conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
164     Thread.sleep(1500); //wait a little bit to make sure new file ts is > last checkpoint time.
165     File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
166     File aNewFile = File.createTempFile("new", "file", dirWithFile);
167     anOldFile.deleteOnExit();
168     aNewFile.deleteOnExit();
169     anOldFile.setLastModified(10);//just after epoch
170     agent = new ChukwaAgent(conf); //restart agent.
171     
172    Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
173    assertTrue(aNewFile.exists());
174    
175     //make sure we started tailing the new, not the old, file.
176     for(Map.Entry<String, String> adaptors : agent.getAdaptorList().entrySet()) {
177       System.out.println(adaptors.getKey() +": " + adaptors.getValue());
178       assertFalse(adaptors.getValue().contains("oldXYZ"));
179     }
180     //should be four adaptors: the DirTailer on emptyDir, the DirTailer on the full dir,
181     //and FileTailers for File inDir and file newfile
182     Thread.sleep(3 * SCAN_INTERVAL); //wait a bit for the new file to be detected.
183     assertEquals(4, agent.adaptorCount());
184     agent.shutdown();
185     
186     nukeDirContents(checkpointDir);//nuke dir
187     checkpointDir.delete();
188     emptyDir.delete();
189     nukeDirContents(dirWithFile);
190     dirWithFile.delete();
191   }
192 
193   //returns true if dir exists
194   public static boolean nukeDirContents(File dir) {
195     if(dir.exists()) {
196       if(dir.isDirectory()) {
197         for(File f: dir.listFiles()) {
198           nukeDirContents(f);
199           f.delete();
200         }
201       } else
202         dir.delete();
203       
204       return true;
205     }
206     return false;
207   }
208   
209   public static void createEmptyDir(File dir) {
210     if(!nukeDirContents(dir))
211       dir.mkdir();
212     assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
213   }
214 
215 }