1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import junit.framework.Assert;
27 import junit.framework.TestCase;
28
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
31
32 public class TestDemuxManager extends TestCase {
33
34
35
36
37 public void testScenario1() {
38 ChukwaConfiguration cc = new ChukwaConfiguration();
39 String tempDirectory = System.getProperty("test.build.data", "/tmp");
40 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
41
42 cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
43 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
44 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
45 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
46 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
47
48
49 try {
50
51 File dataSinkDirectory = new File(chukwaRootDir +"/logs");
52 dataSinkDirectory.mkdirs();
53 File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink1.done");
54 dataSinkFile.createNewFile();
55
56 DemuxManagerScenario dm = new DemuxManagerScenario1(cc,0);
57 dm.start();
58
59 List<String> requireActions = new ArrayList<String>();
60 requireActions.add("checkDemuxOutputDir:false");
61 requireActions.add("checkDemuxInputDir:false");
62 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
63 requireActions.add("runDemux:true");
64 requireActions.add("checkDemuxOutputDir:false");
65 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
66 requireActions.add("processData done");
67
68 List<String> actions = dm.actions;
69
70 Assert.assertTrue(requireActions.size() == actions.size());
71
72 for(int i=0;i<requireActions.size();i++) {
73 Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
74 }
75
76 } catch (Exception e) {
77 e.printStackTrace();
78 Assert.fail();
79 }
80 finally {
81 deleteDirectory(new File(chukwaRootDir));
82 }
83 }
84
85
86
87
88
89 public void testScenario2() {
90 ChukwaConfiguration cc = new ChukwaConfiguration();
91 String tempDirectory = System.getProperty("test.build.data", "/tmp");
92 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
93
94 cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
95 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
96 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
97 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
98 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
99
100 try {
101 DemuxManagerScenario dm = new DemuxManagerScenario2(cc);
102 dm.start();
103
104 List<String> requireActions = new ArrayList<String>();
105 requireActions.add("checkDemuxOutputDir:false");
106 requireActions.add("checkDemuxInputDir:false");
107 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:false");
108 requireActions.add("checkDemuxOutputDir:false");
109 requireActions.add("checkDemuxInputDir:false");
110 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
111 requireActions.add("runDemux:true");
112 requireActions.add("checkDemuxOutputDir:false");
113 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
114 requireActions.add("processData done");
115
116 List<String> actions = dm.actions;
117 Assert.assertTrue(requireActions.size() == actions.size());
118
119 for(int i=0;i<requireActions.size();i++) {
120 Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
121 }
122
123 } catch (Exception e) {
124 e.printStackTrace();
125 Assert.fail();
126 }
127 finally {
128 deleteDirectory(new File(chukwaRootDir));
129 }
130 }
131
132
133
134
135
136
137
138
139 public void testScenario3() {
140 ChukwaConfiguration cc = new ChukwaConfiguration();
141 String tempDirectory = System.getProperty("test.build.data", "/tmp");
142 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
143
144 cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
145 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
146 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
147 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
148 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
149
150 try {
151 File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
152 mrInputDir.mkdirs();
153 File dataSinkDirectory = new File(chukwaRootDir +"/logs");
154 dataSinkDirectory.mkdirs();
155 File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink3.done");
156 dataSinkFile.createNewFile();
157
158 DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);
159 dm.start();
160
161 List<String> requireActions = new ArrayList<String>();
162
163
164 requireActions.add("checkDemuxOutputDir:false");
165 requireActions.add("checkDemuxInputDir:true");
166 requireActions.add("runDemux:true");
167 requireActions.add("checkDemuxOutputDir:false");
168 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
169 requireActions.add("processData done");
170
171
172 requireActions.add("checkDemuxOutputDir:false");
173 requireActions.add("checkDemuxInputDir:false");
174 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
175 requireActions.add("runDemux:true");
176 requireActions.add("checkDemuxOutputDir:false");
177 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
178 requireActions.add("processData done");
179
180 List<String> actions = dm.actions;
181 Assert.assertTrue(requireActions.size() == actions.size());
182
183 for (int i=0;i<requireActions.size();i++) {
184 Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
185 }
186
187 } catch (Exception e) {
188 e.printStackTrace();
189 Assert.fail();
190 }
191 finally {
192 deleteDirectory(new File(chukwaRootDir));
193 }
194 }
195
196
197
198
199
200
201
202
203
204
205 public void testScenario4() {
206 ChukwaConfiguration cc = new ChukwaConfiguration();
207 String tempDirectory = System.getProperty("test.build.data", "/tmp");
208 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
209
210 cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
211 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
212 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
213 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
214 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
215
216 try {
217 File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
218 mrInputDir.mkdirs();
219 File mrOutputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+ CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
220 mrOutputDir.mkdirs();
221
222 File dataSinkDirectory = new File(chukwaRootDir +"/logs");
223 dataSinkDirectory.mkdirs();
224 File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink4.done");
225 dataSinkFile.createNewFile();
226
227 DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);
228 dm.start();
229
230 List<String> requireActions = new ArrayList<String>();
231
232 requireActions.add("checkDemuxOutputDir:true");
233 requireActions.add("deleteDemuxOutputDir:true");
234 requireActions.add("checkDemuxOutputDir:false");
235 requireActions.add("checkDemuxInputDir:true");
236 requireActions.add("runDemux:true");
237 requireActions.add("checkDemuxOutputDir:false");
238 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
239 requireActions.add("processData done");
240
241
242 requireActions.add("checkDemuxOutputDir:false");
243 requireActions.add("checkDemuxInputDir:false");
244 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
245 requireActions.add("runDemux:true");
246 requireActions.add("checkDemuxOutputDir:false");
247 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
248 requireActions.add("processData done");
249
250 List<String> actions = dm.actions;
251 Assert.assertTrue(requireActions.size() == actions.size());
252
253 for(int i=0;i<requireActions.size();i++) {
254 Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
255 }
256
257 } catch (Exception e) {
258 e.printStackTrace();
259 Assert.fail();
260 }
261 finally {
262 deleteDirectory(new File(chukwaRootDir));
263 }
264 }
265
266
267
268
269
270
271
272 public void testScenario5() {
273 ChukwaConfiguration cc = new ChukwaConfiguration();
274 String tempDirectory = System.getProperty("test.build.data", "/tmp");
275 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
276
277 cc.set(CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD, "file:///");
278 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
279 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
280 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
281 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
282
283 try {
284 File dataSinkDirectory = new File(chukwaRootDir +"/logs");
285 dataSinkDirectory.mkdirs();
286 File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink5-0.done");
287 dataSinkFile.createNewFile();
288
289 DemuxManagerScenario dm = new DemuxManagerScenario5(cc);
290 dm.start();
291
292 List<String> requireActions = new ArrayList<String>();
293
294
295 requireActions.add("checkDemuxOutputDir:false");
296 requireActions.add("checkDemuxInputDir:false");
297 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
298 requireActions.add("runDemux:false");
299 requireActions.add("processData done");
300
301
302 requireActions.add("checkDemuxOutputDir:true");
303 requireActions.add("deleteDemuxOutputDir:true");
304 requireActions.add("checkDemuxOutputDir:false");
305 requireActions.add("checkDemuxInputDir:true");
306 requireActions.add("runDemux:false");
307 requireActions.add("processData done");
308
309
310 requireActions.add("checkDemuxOutputDir:true");
311 requireActions.add("deleteDemuxOutputDir:true");
312 requireActions.add("checkDemuxOutputDir:false");
313 requireActions.add("checkDemuxInputDir:true");
314 requireActions.add("runDemux:false");
315 requireActions.add("processData done");
316
317
318 requireActions.add("checkDemuxOutputDir:true");
319 requireActions.add("deleteDemuxOutputDir:true");
320 requireActions.add("checkDemuxOutputDir:false");
321 requireActions.add("checkDemuxInputDir:true");
322 requireActions.add("runDemux:false");
323 requireActions.add("processData done");
324 requireActions.add("checkDemuxOutputDir:true");
325 requireActions.add("deleteDemuxOutputDir:true");
326 requireActions.add("checkDemuxOutputDir:false");
327 requireActions.add("checkDemuxInputDir:true");
328 requireActions.add("moveDataSinkFilesToDemuxErrorDirectory:true");
329
330 requireActions.add("checkDemuxOutputDir:false");
331 requireActions.add("checkDemuxInputDir:false");
332 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
333 requireActions.add("runDemux:true");
334 requireActions.add("checkDemuxOutputDir:true");
335 requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true");
336 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
337 requireActions.add("processData done");
338
339
340 List<String> actions = dm.actions;
341 Assert.assertTrue(requireActions.size() == actions.size());
342
343 for(int i=0;i<requireActions.size();i++) {
344 Assert.assertTrue( i + " - " + requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
345 }
346
347 } catch (Exception e) {
348 e.printStackTrace();
349 Assert.fail();
350 }
351 finally {
352 deleteDirectory(new File(chukwaRootDir));
353 }
354 }
355
356
357
358
359
360 public void testScenario6() {
361 ChukwaConfiguration cc = new ChukwaConfiguration();
362 String tempDirectory = System.getProperty("test.build.data", "/tmp");
363 String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis() +"/";
364
365
366 cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
367 cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
368 cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
369 cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
370 cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
371
372 try {
373
374 File dataSinkDirectory = new File(chukwaRootDir +"/logs");
375 dataSinkDirectory.mkdirs();
376 File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink6.done");
377 dataSinkFile.createNewFile();
378
379 DemuxManagerScenario dm = new DemuxManagerScenario6(cc,3);
380 dm.start();
381
382 List<String> requireActions = new ArrayList<String>();
383 for (int i=0;i<3;i++) {
384 requireActions.add("checkDemuxOutputDir:false");
385 requireActions.add("checkDemuxInputDir:false");
386 requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
387 requireActions.add("runDemux:true");
388 requireActions.add("checkDemuxOutputDir:true");
389 requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true");
390 requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
391 requireActions.add("processData done");
392 }
393
394 List<String> actions = dm.actions;
395 Assert.assertTrue(requireActions.size() == actions.size());
396
397 for(int i=0;i<requireActions.size();i++) {
398 Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern() == actions.get(i).intern());
399 }
400
401
402 } catch (Exception e) {
403 e.printStackTrace();
404 Assert.fail();
405 }
406 finally {
407 deleteDirectory(new File(chukwaRootDir));
408 }
409
410 }
411
412
413 static public boolean deleteDirectory(File path) {
414 if( path.exists() ) {
415 File[] files = path.listFiles();
416 for(int i=0; i<files.length; i++) {
417 if(files[i].isDirectory()) {
418 deleteDirectory(files[i]);
419 }
420 else {
421 files[i].delete();
422 }
423 }
424 }
425 return( path.delete() );
426 }
427
428
429
430
431
432 private static class DemuxManagerScenario6 extends DemuxManagerScenario {
433 int count = 0;
434 public DemuxManagerScenario6(ChukwaConfiguration conf, int count) throws Exception {
435 super(conf);
436 this.count = count;
437 }
438 @Override
439 public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
440 boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
441
442 try {
443
444 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
445 File mrOutputDir = new File(chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
446 mrOutputDir.mkdirs();
447 mrOutputDir.deleteOnExit();
448
449
450 File dataSinkDirectory = new File(chukwaRootDir +"logs");
451 dataSinkDirectory.mkdirs();
452 File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink6-" + count + ".done");
453 dataSinkFile.createNewFile();
454
455 }catch(Exception e) {
456 throw new RuntimeException(e);
457 }
458
459 count --;
460 if (count <= 0) {
461 this.isRunning = false;
462 }
463
464 return res;
465 }
466 }
467
468
469 private static class DemuxManagerScenario5 extends DemuxManagerScenario {
470 public DemuxManagerScenario5(ChukwaConfiguration conf) throws Exception {
471 super(conf);
472 }
473
474 boolean errorDone = false;
475 public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
476 String demuxErrorDir) throws IOException {
477 boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
478
479 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
480 File dataSinkDirectory = new File(chukwaRootDir +"logs");
481 dataSinkDirectory.mkdirs();
482 dataSinkDirectory.deleteOnExit();
483 File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink5-1.done");
484 dataSinkFile.createNewFile();
485 dataSinkFile.deleteOnExit();
486
487 errorDone = true;
488 return res;
489 }
490
491 int counter = 0;
492 @Override
493 public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
494 if (errorDone && counter >= 4) {
495 this.isRunning = false;
496 }
497
498
499 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
500 File mrOutputDir = new File(chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
501 mrOutputDir.mkdirs();
502 mrOutputDir.deleteOnExit();
503
504 counter ++;
505 this.actions.add("runDemux:" + errorDone);
506 return errorDone;
507 }
508 }
509
510 private static class DemuxManagerScenario2 extends DemuxManagerScenario {
511 public DemuxManagerScenario2(ChukwaConfiguration conf) throws Exception {
512 super(conf);
513 }
514
515 int counter = 0;
516 @Override
517 public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
518 String demuxInputDir) throws IOException {
519 if (counter == 1) {
520 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
521 File dataSinkDirectory = new File(chukwaRootDir +"logs");
522 dataSinkDirectory.mkdirs();
523 dataSinkDirectory.deleteOnExit();
524 File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink2.done");
525 dataSinkFile.createNewFile();
526 dataSinkFile.deleteOnExit();
527 }
528
529 counter ++;
530 return super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
531 }
532
533
534 @Override
535 public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
536 boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
537 if (counter > 1) {
538 this.isRunning = false;
539 }
540 return res;
541 }
542 }
543
544 private static class DemuxManagerScenario1 extends DemuxManagerScenario {
545 int count = 0;
546 public DemuxManagerScenario1(ChukwaConfiguration conf, int count) throws Exception {
547 super(conf);
548 this.count = count;
549 }
550 @Override
551 public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
552 boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
553 count --;
554 if (count <= 0) {
555 this.isRunning = false;
556 }
557
558 return res;
559 }
560 }
561
562 private static class DemuxManagerScenario extends DemuxManager {
563 public List<String>actions = new ArrayList<String>();
564
565 public DemuxManagerScenario(ChukwaConfiguration conf) throws Exception {
566 super(conf);
567 NO_DATASINK_SLEEP_TIME = 5;
568 }
569
570 @Override
571 public boolean checkDemuxInputDir(String demuxInputDir) throws IOException {
572 boolean res = super.checkDemuxInputDir(demuxInputDir);
573 this.actions.add("checkDemuxInputDir:" + res);
574 return res;
575 }
576
577 @Override
578 public boolean checkDemuxOutputDir(String demuxOutputDir)
579 throws IOException {
580 boolean res = super.checkDemuxOutputDir(demuxOutputDir);
581 this.actions.add("checkDemuxOutputDir:" + res);
582 return res;
583 }
584
585 @Override
586 public boolean moveDataSinkFilesToArchiveDirectory(String demuxInputDir,
587 String archiveDirectory) throws IOException {
588 boolean res = super.moveDataSinkFilesToArchiveDirectory(demuxInputDir,
589 archiveDirectory);
590 this.actions.add("moveDataSinkFilesToArchiveDirectory:" + res);
591 return res;
592 }
593
594 @Override
595 public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
596 String demuxErrorDir) throws IOException {
597 boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
598 this.actions.add("moveDataSinkFilesToDemuxErrorDirectory:" + res);
599 return res;
600 }
601
602 @Override
603 public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
604 String demuxInputDir) throws IOException {
605 boolean res = super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
606 this.actions.add("moveDataSinkFilesToDemuxInputDirectory:" + res);
607 return res;
608 }
609
610 @Override
611 public boolean moveDemuxOutputDirToPostProcessDirectory(
612 String demuxOutputDir, String postProcessDirectory) throws IOException {
613 boolean res = super.moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir,
614 postProcessDirectory);
615 this.actions.add("moveDemuxOutputDirToPostProcessDirectory:" + res);
616 return res;
617 }
618
619 @Override
620 public boolean processData(String dataSinkDir, String demuxInputDir,
621 String demuxOutputDir, String postProcessDir, String archiveDir)
622 throws IOException {
623 boolean res = super.processData(dataSinkDir, demuxInputDir, demuxOutputDir, postProcessDir,
624 archiveDir);
625 this.actions.add("processData done");
626 return res;
627 }
628
629 @Override
630 public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
631 boolean res = true;
632 this.actions.add("runDemux:" + res);
633 return res;
634 }
635
636 @Override
637 public boolean deleteDemuxOutputDir(String demuxOutputDir)
638 throws IOException {
639 boolean res = super.deleteDemuxOutputDir(demuxOutputDir);
640 this.actions.add("deleteDemuxOutputDir:" + res);
641 return res;
642 }
643
644 }
645
646 }