View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29  import org.apache.hadoop.chukwa.util.NagiosHelper;
30  import org.apache.hadoop.chukwa.util.DaemonWatcher;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.fs.PathFilter;
35  import org.apache.hadoop.util.ToolRunner;
36  import org.apache.log4j.Logger;
37  
38  public class DemuxManager implements CHUKWA_CONSTANT {  
39    static Logger log = Logger.getLogger(DemuxManager.class);
40  
41    static int globalErrorcounter = 0;
42    static Date firstErrorTime = null;
43  
44    protected int ERROR_SLEEP_TIME = 60;
45    protected int NO_DATASINK_SLEEP_TIME = 20;
46  
47    protected int DEFAULT_MAX_ERROR_COUNT = 6;
48    protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
49    protected int DEFAULT_REDUCER_COUNT = 8;
50    
51    protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
52    protected int demuxReducerCount = 0;
53    protected ChukwaConfiguration conf = null;
54    protected FileSystem fs = null;
55    protected int reprocess = 0;
56    protected boolean sendAlert = true;
57    
58    protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
59    protected volatile boolean isRunning = true;
60  
61    final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
62      public boolean accept(Path file) {
63        return file.getName().endsWith(".done");
64      }     
65    };
66  
67  
68    public static void main(String[] args) throws Exception {
69      DaemonWatcher.createInstance("DemuxManager");
70      
71      DemuxManager manager = new DemuxManager();
72      manager.start();
73  
74    }
75  
76    public DemuxManager() throws Exception {
77      this.conf = new ChukwaConfiguration();
78      init();
79    }
80  
81    public DemuxManager(ChukwaConfiguration conf) throws Exception {
82      this.conf = conf;
83      init();
84    }
85  
86    protected void init() throws IOException, URISyntaxException {
87      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
88      fs = FileSystem.get(new URI(fsName), conf);
89    }
90  
91    public void shutdown() {
92      this.isRunning = false;
93    }
94  
95  
96    public int getReprocess() {
97      return reprocess;
98    }
99  
100   /**
101    * Start the Demux Manager daemon
102    * @throws Exception
103    */
104   public void start() throws Exception {
105 
106      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
107      if ( ! chukwaRootDir.endsWith("/") ) {
108        chukwaRootDir += "/";
109      }
110      log.info("chukwaRootDir:" + chukwaRootDir);
111 
112      String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
113      String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
114      String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
115      String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
116 
117      String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
118      if ( ! dataSinkDir.endsWith("/") ) {
119        dataSinkDir += "/";
120      }
121      log.info("dataSinkDir:" + dataSinkDir);
122      
123      String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
124      if ( ! postProcessDir.endsWith("/") ) {
125        postProcessDir += "/";
126      }
127      log.info("postProcessDir:" + postProcessDir);
128      
129      String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
130      if ( ! archiveRootDir.endsWith("/") ) {
131        archiveRootDir += "/";
132      }
133      log.info("archiveRootDir:" + archiveRootDir);
134      
135      maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
136                                           DEFAULT_MAX_ERROR_COUNT);
137      demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
138      log.info("demuxReducerCount:" + demuxReducerCount);
139      
140      String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
141      int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
142      String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
143      
144      log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:" 
145          + nagiosPort + ", reportingHost:" + reportingHost);
146      
147      
148      if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
149        sendAlert = false;
150        log.warn("Alerting is OFF");
151      }
152      
153      boolean demuxReady = false;
154 
155      
156      while (isRunning) {
157        try {
158          demuxReady = false;
159 
160          if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
161            log.warn("==================\nToo many errors (" + globalErrorcounter +
162                     "), Bail out!\n==================");
163            DaemonWatcher.bailout(-1);
164          }
165          
166          // Check for anomalies
167          if (checkDemuxOutputDir(demuxOutputDir) == true) {
168            // delete current demux output dir
169            if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
170              log.warn("Cannot delete an existing demux output directory!");
171              throw new IOException("Cannot move demuxOutput to postProcess!");
172            }
173            continue;
174          } else if (checkDemuxInputDir(demuxInputDir) == true) { // dataSink already there
175            reprocess++;
176 
177            // Data has been processed more than 3 times ... move to InError directory
178            if (reprocess > 3) {
179              if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
180                log.warn("Cannot move dataSink files to DemuxErrorDir!");
181                throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
182              }
183              reprocess = 0;
184              continue;
185            }
186 
187            log.error("Demux inputDir aready contains some dataSink files,"
188                + " going to reprocess, reprocessCount=" + reprocess);
189            demuxReady = true;
190          } else { // standard code path
191            reprocess = 0;
192            // Move new dataSink Files
193            if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
194              demuxReady = true; // if any are available
195            } else {
196              demuxReady = false; // if none
197            }
198          }
199 
200          // start a new demux ?
201          if (demuxReady == true) {
202           boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
203                postProcessDir, archiveRootDir);
204           sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
205 
206           // if demux suceeds, then we reset these.
207           if (demuxStatus) {
208            globalErrorcounter = 0;
209            firstErrorTime = null;
210           }
211          } else {
212            log.info("Demux not ready so going to sleep ...");
213            Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
214          }
215        }catch(Throwable e) {
216          globalErrorcounter ++;
217          if (firstErrorTime == null) firstErrorTime = new Date();
218 
219          log.warn("Consecutive error number " + globalErrorcounter +
220                   " encountered since " + firstErrorTime, e);
221          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
222          try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
223          catch (InterruptedException e1) {/*do nothing*/ }
224          init();
225        }
226      }
227    }
228 
229 
230    /**
231     * Send NSCA status to Nagios
232     * @param nagiosHost
233     * @param nagiosPort
234     * @param reportingHost
235     * @param demuxInErrorDir
236     * @param demuxStatus
237     * @param exception
238     */
239   protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
240         String demuxInErrorDir,boolean demuxStatus,String demuxException) {
241       
242      if (sendAlert == false) {
243        return;
244      }
245       
246      boolean demuxInErrorStatus = true;
247      String demuxInErrorMsg = "";
248      try {
249        Path pDemuxInErrorDir = new Path(demuxInErrorDir);
250        if ( fs.exists(pDemuxInErrorDir)) {
251          FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
252          if (demuxInErrorDirs.length == 0) {
253            demuxInErrorStatus = false;
254          }          
255        }
256      } catch (Throwable e) {
257        demuxInErrorMsg = e.getMessage();
258        log.warn(e);
259      }
260      
261      // send Demux status
262      if (demuxStatus == true) {
263        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux OK",NagiosHelper.NAGIOS_OK);
264      } else {
265        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
266      }
267      
268      // send DemuxInErrorStatus
269      if (demuxInErrorStatus == false) {
270        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError OK",NagiosHelper.NAGIOS_OK);
271      } else {
272        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
273      }
274      
275    }
276    
277    /**
278     * Process Data, i.e. 
279     * - run demux
280     * - move demux output to postProcessDir
281     * - move dataSink file to archiveDir
282     * 
283     * @param dataSinkDir
284     * @param demuxInputDir
285     * @param demuxOutputDir
286     * @param postProcessDir
287     * @param archiveDir
288     * @return True iff succeed
289     * @throws IOException
290     */
291     protected boolean processData(String dataSinkDir, String demuxInputDir,
292        String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
293 
294      boolean demuxStatus = false;
295 
296      long startTime = System.currentTimeMillis();
297      demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
298      log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
299 
300      if (demuxStatus == false) {
301        log.warn("Demux failed!");
302      } else {
303 
304        // Move demux output to postProcessDir 
305        if (checkDemuxOutputDir(demuxOutputDir)) {
306          if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
307            log.warn("Cannot move demuxOutput to postProcess! bail out!");
308            throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
309          } 
310        } else {
311          log.warn("Demux processing OK but no output");
312        }
313 
314        // Move DataSink Files to archiveDir
315        if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
316          log.warn("Cannot move datasinkFile to archive! bail out!");
317          throw new IOException("Cannot move datasinkFile to archive! bail out!");
318        }
319      }
320      
321      return demuxStatus;
322    }
323 
324 
325    /**
326     * Submit and Run demux Job 
327     * @param demuxInputDir
328     * @param demuxOutputDir
329     * @return true id Demux succeed
330     */
331    protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
332      String[] demuxParams;
333      int i=0;
334      Demux.addParsers(conf);
335      demuxParams = new String[4];
336      demuxParams[i++] = "-r";
337      demuxParams[i++] = "" + demuxReducerCount;
338      demuxParams[i++] = demuxInputDir;
339      demuxParams[i++] = demuxOutputDir;
340      try {
341        return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
342      } catch (Throwable e) {
343        e.printStackTrace();
344        globalErrorcounter ++;
345        if (firstErrorTime == null) firstErrorTime = new Date();
346        log.error("Failed to run demux. Consecutive error number " +
347                globalErrorcounter + " encountered since " + firstErrorTime, e);
348      }
349      return false;
350    }
351 
352 
353 
354    /**
355     * Move dataSink files to Demux input directory
356     * @param dataSinkDir
357     * @param demuxInputDir
358     * @return true if there's any dataSink files ready to be processed
359     * @throws IOException
360     */
361    protected boolean moveDataSinkFilesToDemuxInputDirectory(
362        String dataSinkDir, String demuxInputDir) throws IOException {
363      Path pDataSinkDir = new Path(dataSinkDir);
364      Path pDemuxInputDir = new Path(demuxInputDir);
365      log.info("dataSinkDir: " + dataSinkDir);
366      log.info("demuxInputDir: " + demuxInputDir);
367 
368 
369      boolean containsFile = false;
370 
371      FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
372      if (dataSinkFiles.length > 0) {
373        setup(pDemuxInputDir);
374      }
375 
376      int maxFilesPerDemux = 0;
377      for (FileStatus fstatus : dataSinkFiles) {
378        boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
379        log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
380        maxFilesPerDemux ++;
381        containsFile = true;
382        if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
383          log.info("Max File per Demux reached:" + maxFilesPerDemux);
384          break;
385        }
386      }
387      return containsFile;
388    }
389 
390 
391 
392 
393    /**
394     * Move sourceFolder inside destFolder
395     * @param dataSinkDir : ex chukwa/demux/inputDir
396     * @param demuxErrorDir : ex /chukwa/demux/inError
397     * @return true if able to move chukwa/demux/inputDir to /chukwa/demux/inError/<YYYYMMDD>/demuxInputDirXXX
398     * @throws IOException
399     */
400    protected boolean moveDataSinkFilesToDemuxErrorDirectory(
401        String dataSinkDir, String demuxErrorDir) throws IOException {
402      demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
403      return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
404    }
405 
406    /**
407     * Move sourceFolder inside destFolder
408     * @param demuxInputDir: ex chukwa/demux/inputDir
409     * @param archiveDirectory: ex /chukwa/archives
410     * @return true if able to move chukwa/demux/inputDir to /chukwa/archives/raw/<YYYYMMDD>/dataSinkDirXXX
411     * @throws IOException
412     */
413    protected boolean moveDataSinkFilesToArchiveDirectory(
414        String demuxInputDir, String archiveDirectory) throws IOException {
415      archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
416      return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
417    }
418 
419    /**
420     * Move sourceFolder inside destFolder
421     * @param demuxOutputDir: ex chukwa/demux/outputDir 
422     * @param postProcessDirectory: ex /chukwa/postProcess
423     * @return true if able to move chukwa/demux/outputDir to /chukwa/postProcess/demuxOutputDirXXX
424     * @throws IOException 
425     */
426    protected  boolean moveDemuxOutputDirToPostProcessDirectory(
427        String demuxOutputDir, String postProcessDirectory) throws IOException {
428      return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
429    }
430 
431 
432    /**
433     * Test if demuxInputDir exists
434     * @param demuxInputDir
435     * @return true if demuxInputDir exists
436     * @throws IOException
437     */
438    protected boolean checkDemuxInputDir(String demuxInputDir)
439    throws IOException {
440      return dirExists(demuxInputDir);
441    }
442 
443    /**
444     * Test if demuxOutputDir exists
445     * @param demuxOutputDir
446     * @return true if demuxOutputDir exists
447     * @throws IOException
448     */
449    protected boolean checkDemuxOutputDir(String demuxOutputDir)
450    throws IOException {
451      return dirExists(demuxOutputDir);
452    }
453 
454 
455    /**
456     * Delete DemuxOutput directory
457     * @param demuxOutputDir
458     * @return true if succeed
459     * @throws IOException
460     */
461    protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
462    {
463      return fs.delete(new Path(demuxOutputDir), true);
464    }
465 
466    /**
467     * Create directory if !exists
468     * @param directory
469     * @throws IOException
470     */
471    protected void setup(Path directory) throws IOException {
472       if ( ! fs.exists(directory)) {
473         fs.mkdirs(directory);
474       }
475     }
476 
477     /** 
478      * Check if source exists and if source is a directory
479      * @param f source file
480      */
481    protected boolean dirExists(String directory) throws IOException {
482       Path pDirectory = new Path(directory);
483       return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
484     }
485 
486     /**
487      * Move sourceFolder inside destFolder
488      * @param srcDir
489      * @param destDir
490      * @return
491      * @throws IOException
492      */ 
493    protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
494       if (!destDir.endsWith("/")) {
495         destDir +="/";
496       }
497       Path pSrcDir = new Path(srcDir);
498       Path pDestDir = new Path(destDir );
499       setup(pDestDir);
500       destDir += prefix +"_" +System.currentTimeMillis();
501       Path pFinalDestDir = new Path(destDir );
502 
503       return fs.rename(pSrcDir, pFinalDestDir);
504     }
505 }