View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import junit.framework.Assert;
27  import junit.framework.TestCase;
28  
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
31  
32  public class TestDemuxManager extends TestCase {
33  
34    /**
35     * Standard workflow
36     */
37    public void testScenario1() {
38      ChukwaConfiguration cc = new ChukwaConfiguration();
39      String tempDirectory = System.getProperty("test.build.data", "/tmp");
40      String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
41      
42      cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
43      cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
44      cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
45      cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
46      cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
47      
48       
49      try {
50        
51        File dataSinkDirectory = new File(chukwaRootDir +"/logs");
52        dataSinkDirectory.mkdirs();
53        File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink1.done");
54        dataSinkFile.createNewFile();
55        
56        DemuxManagerScenario dm = new DemuxManagerScenario1(cc,0);  
57        dm.start();
58        
59        List<String> requireActions = new ArrayList<String>();
60        requireActions.add("checkDemuxOutputDir:false");
61        requireActions.add("checkDemuxInputDir:false");
62        requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
63        requireActions.add("runDemux:true");
64        requireActions.add("checkDemuxOutputDir:false");
65        requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
66        requireActions.add("processData done");
67        
68        List<String> actions = dm.actions;
69       
70        Assert.assertTrue(requireActions.size() == actions.size());
71        
72        for(int i=0;i<requireActions.size();i++) {
73          Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
74        }
75  
76      } catch (Exception e) {
77        e.printStackTrace();
78       Assert.fail();
79      }
80      finally {
81        deleteDirectory(new File(chukwaRootDir));
82      }
83    }
84    
85    /**
86     * No dataSink file at  startup
87     * Add one later
88     */
89    public void testScenario2() {
90      ChukwaConfiguration cc = new ChukwaConfiguration();
91      String tempDirectory = System.getProperty("test.build.data", "/tmp");
92      String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
93      
94      cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
95      cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
96      cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
97      cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
98      cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
99  
100     try {
101       DemuxManagerScenario dm = new DemuxManagerScenario2(cc);  
102       dm.start();
103  
104       List<String> requireActions = new ArrayList<String>();
105       requireActions.add("checkDemuxOutputDir:false");
106       requireActions.add("checkDemuxInputDir:false");
107       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:false");
108       requireActions.add("checkDemuxOutputDir:false");
109       requireActions.add("checkDemuxInputDir:false");
110       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
111       requireActions.add("runDemux:true");
112       requireActions.add("checkDemuxOutputDir:false");
113       requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
114       requireActions.add("processData done");
115 
116       List<String> actions = dm.actions;
117       Assert.assertTrue(requireActions.size() == actions.size());
118       
119       for(int i=0;i<requireActions.size();i++) {
120         Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
121       }
122 
123     } catch (Exception e) {
124       e.printStackTrace();
125       Assert.fail();
126     }
127     finally {
128       deleteDirectory(new File(chukwaRootDir));
129     }
130   }
131   
132   
133   /**
134    * DataSink file in dataSink directory
135    * MR_INPUT_DIR already there
136    * MR_INPUT_DIR should be reprocessed first
137    * Then dataSink file
138    */
139   public void testScenario3() {
140     ChukwaConfiguration cc = new ChukwaConfiguration();
141     String tempDirectory = System.getProperty("test.build.data", "/tmp");
142     String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
143     
144     cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
145     cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
146     cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
147     cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
148     cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
149 
150     try {
151       File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
152       mrInputDir.mkdirs();
153       File dataSinkDirectory = new File(chukwaRootDir +"/logs");
154       dataSinkDirectory.mkdirs();
155       File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink3.done");
156       dataSinkFile.createNewFile();
157       
158       DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);  
159       dm.start();
160  
161       List<String> requireActions = new ArrayList<String>();
162      
163       // DEMUX_INPUT_DIR reprocessing
164       requireActions.add("checkDemuxOutputDir:false");
165       requireActions.add("checkDemuxInputDir:true");
166       requireActions.add("runDemux:true");
167       requireActions.add("checkDemuxOutputDir:false");
168       requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
169       requireActions.add("processData done");
170 
171       // dataSink3.done processing
172       requireActions.add("checkDemuxOutputDir:false"); 
173       requireActions.add("checkDemuxInputDir:false"); 
174       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
175       requireActions.add("runDemux:true"); 
176       requireActions.add("checkDemuxOutputDir:false"); 
177       requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
178       requireActions.add("processData done");
179       
180       List<String> actions = dm.actions;
181       Assert.assertTrue(requireActions.size() == actions.size());
182       
183       for (int i=0;i<requireActions.size();i++) {
184         Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
185       }
186 
187     } catch (Exception e) {
188       e.printStackTrace();
189       Assert.fail();
190     }
191     finally {
192       deleteDirectory(new File(chukwaRootDir));
193     }
194   }
195   
196   
197   /**
198    * DataSink file in dataSink directory
199    * MR_INPUT_DIR already there
200    * MR_OUTPUT_DIR already there
201    * MR_OUTPUT_DIR should be deleted first
202    * Then MR_INPUT_DIR should be reprocessed
203    * Then dataSink file
204    */
205   public void testScenario4() {
206     ChukwaConfiguration cc = new ChukwaConfiguration();
207     String tempDirectory = System.getProperty("test.build.data", "/tmp");
208     String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
209     
210     cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
211     cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
212     cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
213     cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
214     cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
215 
216     try {
217       File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
218       mrInputDir.mkdirs();
219       File mrOutputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
220       mrOutputDir.mkdirs();
221       
222       File dataSinkDirectory = new File(chukwaRootDir +"/logs");
223       dataSinkDirectory.mkdirs();
224       File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink4.done");
225       dataSinkFile.createNewFile();
226       
227       DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);  
228       dm.start();
229  
230       List<String> requireActions = new ArrayList<String>();
231 
232       requireActions.add("checkDemuxOutputDir:true"); 
233       requireActions.add("deleteDemuxOutputDir:true");
234       requireActions.add("checkDemuxOutputDir:false");
235       requireActions.add("checkDemuxInputDir:true");
236       requireActions.add("runDemux:true"); 
237       requireActions.add("checkDemuxOutputDir:false"); 
238       requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
239       requireActions.add("processData done");
240 
241       // dataSink4.done processing
242       requireActions.add("checkDemuxOutputDir:false"); 
243       requireActions.add("checkDemuxInputDir:false"); 
244       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
245       requireActions.add("runDemux:true"); 
246       requireActions.add("checkDemuxOutputDir:false"); 
247       requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
248       requireActions.add("processData done");
249             
250       List<String> actions = dm.actions;
251       Assert.assertTrue(requireActions.size() == actions.size());
252       
253       for(int i=0;i<requireActions.size();i++) {
254         Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
255       }
256 
257     } catch (Exception e) {
258       e.printStackTrace();
259       Assert.fail();
260     }
261     finally {
262       deleteDirectory(new File(chukwaRootDir));
263     }
264   }
265   
266   /**
267    * DataSink file in dataSinkDir
268    * Demux fails 3 times
269    * Add a new DataSink file to dataSinkDir
270    * Demux succeed
271    */
272   public void testScenario5() {
273     ChukwaConfiguration cc = new ChukwaConfiguration();
274     String tempDirectory = System.getProperty("test.build.data", "/tmp");
275     String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
276     
277     cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
278     cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
279     cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
280     cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
281     cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
282 
283     try {
284       File dataSinkDirectory = new File(chukwaRootDir +"/logs");
285       dataSinkDirectory.mkdirs();
286       File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink5-0.done");
287       dataSinkFile.createNewFile();
288       
289       DemuxManagerScenario dm = new DemuxManagerScenario5(cc);  
290       dm.start();
291  
292       List<String> requireActions = new ArrayList<String>();
293       
294       // Move dataSink & process
295       requireActions.add("checkDemuxOutputDir:false"); 
296       requireActions.add("checkDemuxInputDir:false"); 
297       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
298       requireActions.add("runDemux:false"); 
299       requireActions.add("processData done"); 
300       
301       // Reprocess 1
302       requireActions.add("checkDemuxOutputDir:true"); 
303       requireActions.add("deleteDemuxOutputDir:true");
304       requireActions.add("checkDemuxOutputDir:false"); 
305       requireActions.add("checkDemuxInputDir:true"); 
306       requireActions.add("runDemux:false"); 
307       requireActions.add("processData done"); 
308       
309       // Reprocess 2
310       requireActions.add("checkDemuxOutputDir:true"); 
311       requireActions.add("deleteDemuxOutputDir:true");
312       requireActions.add("checkDemuxOutputDir:false"); 
313       requireActions.add("checkDemuxInputDir:true"); 
314       requireActions.add("runDemux:false"); 
315       requireActions.add("processData done"); 
316       
317       // Reprocess3
318       requireActions.add("checkDemuxOutputDir:true"); 
319       requireActions.add("deleteDemuxOutputDir:true");
320       requireActions.add("checkDemuxOutputDir:false"); 
321       requireActions.add("checkDemuxInputDir:true"); 
322       requireActions.add("runDemux:false"); 
323       requireActions.add("processData done"); 
324       requireActions.add("checkDemuxOutputDir:true"); 
325       requireActions.add("deleteDemuxOutputDir:true");
326       requireActions.add("checkDemuxOutputDir:false");
327       requireActions.add("checkDemuxInputDir:true");
328       requireActions.add("moveDataSinkFilesToDemuxErrorDirectory:true");
329      
330       requireActions.add("checkDemuxOutputDir:false"); 
331       requireActions.add("checkDemuxInputDir:false"); 
332       requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
333       requireActions.add("runDemux:true"); 
334       requireActions.add("checkDemuxOutputDir:true");
335       requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true"); 
336       requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
337       requireActions.add("processData done");
338       
339       
340       List<String> actions = dm.actions;
341       Assert.assertTrue(requireActions.size() == actions.size());
342       
343       for(int i=0;i<requireActions.size();i++) {
344         Assert.assertTrue( i + " - " + requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
345       }
346 
347     } catch (Exception e) {
348       e.printStackTrace();
349       Assert.fail();
350     }
351     finally {
352       deleteDirectory(new File(chukwaRootDir));
353     }
354   }
355   
356   
357   /**
358    * Standard workflow with MR_OUTPUT
359    */
360   public void testScenario6() {
361     ChukwaConfiguration cc = new ChukwaConfiguration();
362     String tempDirectory = System.getProperty("test.build.data", "/tmp");
363     String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
364     
365     
366     cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
367     cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
368     cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
369     cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
370     cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
371     
372     try {
373       
374       File dataSinkDirectory = new File(chukwaRootDir +"/logs");
375       dataSinkDirectory.mkdirs();
376       File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink6.done");
377       dataSinkFile.createNewFile();
378       
379       DemuxManagerScenario dm = new DemuxManagerScenario6(cc,3);  
380       dm.start();
381       
382       List<String> requireActions = new ArrayList<String>();
383       for (int i=0;i<3;i++) {
384         requireActions.add("checkDemuxOutputDir:false");
385         requireActions.add("checkDemuxInputDir:false");
386         requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
387         requireActions.add("runDemux:true");
388         requireActions.add("checkDemuxOutputDir:true");
389         requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true");
390         requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
391         requireActions.add("processData done");   
392       }
393      
394       List<String> actions = dm.actions;
395       Assert.assertTrue(requireActions.size() == actions.size());
396       
397       for(int i=0;i<requireActions.size();i++) {
398         Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
399       }
400 
401       
402     } catch (Exception e) {
403       e.printStackTrace();
404      Assert.fail();
405     }
406     finally {
407       deleteDirectory(new File(chukwaRootDir));
408     }
409     
410   }
411   
412   
413   static public boolean deleteDirectory(File path) {
414     if( path.exists() ) {
415       File[] files = path.listFiles();
416       for(int i=0; i<files.length; i++) {
417          if(files[i].isDirectory()) {
418            deleteDirectory(files[i]);
419          }
420          else {
421            files[i].delete();
422          }
423       }
424     }
425     return( path.delete() );
426   }
427 
428      /////////////////////////\
429     //// HELPER CLASSES /////  \
430    /////////////////////////____\
431   
432   private static class DemuxManagerScenario6 extends DemuxManagerScenario {
433     int count = 0;
434     public DemuxManagerScenario6(ChukwaConfiguration conf, int count) throws Exception {
435       super(conf);
436       this.count = count;
437     }
438     @Override
439     public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
440       boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
441       
442       try {
443         // Create DEMUX_OUTPOUT
444         String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
445         File mrOutputDir = new File(chukwaRootDir  + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
446         mrOutputDir.mkdirs();
447         mrOutputDir.deleteOnExit();
448         
449         // ADD DATASINK FILE
450         File dataSinkDirectory = new File(chukwaRootDir +"logs");
451         dataSinkDirectory.mkdirs();
452         File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink6-" + count + ".done");
453         dataSinkFile.createNewFile();
454         
455       }catch(Exception e) {
456         throw new RuntimeException(e);
457       }
458 
459       count --;
460       if (count <= 0) {
461         this.isRunning = false;
462       }
463      
464       return res;
465     }
466   }
467   
468   
469   private static class DemuxManagerScenario5 extends DemuxManagerScenario {
470     public DemuxManagerScenario5(ChukwaConfiguration conf) throws Exception {
471       super(conf);
472     }
473     
474     boolean errorDone = false;
475     public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
476         String demuxErrorDir) throws IOException {
477       boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
478       
479       String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
480       File dataSinkDirectory = new File(chukwaRootDir +"logs");
481       dataSinkDirectory.mkdirs();
482       dataSinkDirectory.deleteOnExit();
483       File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink5-1.done");
484       dataSinkFile.createNewFile();
485       dataSinkFile.deleteOnExit();       
486   
487       errorDone = true;
488       return res;
489     }
490     
491     int counter = 0;
492     @Override
493     public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
494       if (errorDone && counter >= 4) {
495         this.isRunning = false;
496       } 
497       
498       // Create DEMUX_OUTPOUT
499       String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
500       File mrOutputDir = new File(chukwaRootDir  + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
501       mrOutputDir.mkdirs();
502       mrOutputDir.deleteOnExit();
503       
504       counter ++;
505       this.actions.add("runDemux:" + errorDone);
506       return errorDone;
507     }
508   }
509   
510   private static class DemuxManagerScenario2 extends DemuxManagerScenario {
511     public DemuxManagerScenario2(ChukwaConfiguration conf) throws Exception {
512       super(conf);
513     }
514     
515     int counter = 0;
516     @Override
517     public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
518         String demuxInputDir) throws IOException {
519       if (counter == 1) {
520         String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
521         File dataSinkDirectory = new File(chukwaRootDir +"logs");
522         dataSinkDirectory.mkdirs();
523         dataSinkDirectory.deleteOnExit();
524         File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink2.done");
525         dataSinkFile.createNewFile();
526         dataSinkFile.deleteOnExit();
527       }
528 
529       counter ++;
530       return super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
531     }
532     
533    
534     @Override
535     public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
536       boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
537       if (counter > 1) {
538         this.isRunning = false;
539       }
540       return res;
541     }
542   }
543   
544   private static class DemuxManagerScenario1 extends DemuxManagerScenario {
545     int count = 0;
546     public DemuxManagerScenario1(ChukwaConfiguration conf, int count) throws Exception {
547       super(conf);
548       this.count = count;
549     }
550     @Override
551     public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
552       boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
553       count --;
554       if (count <= 0) {
555         this.isRunning = false;
556       }
557      
558       return res;
559     }
560   }
561   
562   private static class DemuxManagerScenario extends DemuxManager {
563     public List<String>actions = new ArrayList<String>();
564  
565     public DemuxManagerScenario(ChukwaConfiguration conf) throws Exception {
566       super(conf);
567       NO_DATASINK_SLEEP_TIME = 5;
568     }
569 
570     @Override
571     public boolean checkDemuxInputDir(String demuxInputDir) throws IOException {
572       boolean res = super.checkDemuxInputDir(demuxInputDir);
573       this.actions.add("checkDemuxInputDir:" + res);
574       return res;
575     }
576 
577     @Override
578     public boolean checkDemuxOutputDir(String demuxOutputDir)
579         throws IOException {
580       boolean res = super.checkDemuxOutputDir(demuxOutputDir);
581       this.actions.add("checkDemuxOutputDir:" + res);
582       return res;
583     }
584 
585     @Override
586     public boolean moveDataSinkFilesToArchiveDirectory(String demuxInputDir,
587         String archiveDirectory) throws IOException {
588       boolean res = super.moveDataSinkFilesToArchiveDirectory(demuxInputDir,
589           archiveDirectory);
590       this.actions.add("moveDataSinkFilesToArchiveDirectory:" + res);
591       return res;
592     }
593 
594     @Override
595     public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
596         String demuxErrorDir) throws IOException {
597       boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
598       this.actions.add("moveDataSinkFilesToDemuxErrorDirectory:" + res);
599       return res;
600     }
601 
602     @Override
603     public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
604         String demuxInputDir) throws IOException {
605       boolean res = super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
606       this.actions.add("moveDataSinkFilesToDemuxInputDirectory:" + res);
607       return res;
608     }
609 
610     @Override
611     public boolean moveDemuxOutputDirToPostProcessDirectory(
612         String demuxOutputDir, String postProcessDirectory) throws IOException {
613       boolean res = super.moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir,
614           postProcessDirectory);
615       this.actions.add("moveDemuxOutputDirToPostProcessDirectory:" + res);
616       return res;
617     }
618 
619     @Override
620     public boolean processData(String dataSinkDir, String demuxInputDir,
621         String demuxOutputDir, String postProcessDir, String archiveDir)
622         throws IOException {
623       boolean res = super.processData(dataSinkDir, demuxInputDir, demuxOutputDir, postProcessDir,
624           archiveDir);
625       this.actions.add("processData done");
626       return res;
627     }
628 
629     @Override
630     public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
631       boolean res = true;
632       this.actions.add("runDemux:" + res);
633       return res;
634     }
635 
636     @Override
637     public boolean deleteDemuxOutputDir(String demuxOutputDir)
638         throws IOException {
639       boolean res = super.deleteDemuxOutputDir(demuxOutputDir);
640       this.actions.add("deleteDemuxOutputDir:" + res);
641       return res;
642     }
643     
644   }
645 
646 }