View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
31  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32  import org.apache.hadoop.chukwa.util.DaemonWatcher;
33  import org.apache.hadoop.chukwa.util.ExceptionUtil;
34  import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
35  import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.fs.PathFilter;
40  import org.apache.log4j.Logger;
41  
42  public class PostProcessorManager implements CHUKWA_CONSTANT{
43    static Logger log = Logger.getLogger(PostProcessorManager.class);
44    
45    protected static HashMap<String, String> dataSources = new HashMap<String, String>();
46    public static int errorCount = 0;
47    
48    protected int ERROR_SLEEP_TIME = 60;
49    protected ChukwaConfiguration conf = null;
50    protected FileSystem fs = null;
51    protected volatile boolean isRunning = true;
52  
53    private static final int DEFAULT_MAX_ERROR_COUNT = 4;
54    
55    final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
56      public boolean accept(Path file) {
57        return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
58      }     
59    };
60  
61    
62    public PostProcessorManager() throws Exception {
63      this.conf = new ChukwaConfiguration();
64      init();
65    }
66    
67    public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
68      this.conf = conf;
69      init();
70    }
71    
72    protected void init() throws IOException, URISyntaxException {
73      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
74      fs = FileSystem.get(new URI(fsName), conf);
75    }
76    
77    public static void main(String[] args) throws Exception {
78   
79      DaemonWatcher.createInstance("PostProcessorManager");
80      
81  
82      
83      PostProcessorManager postProcessorManager = new PostProcessorManager();
84      postProcessorManager.start();
85    }
86    
87    public void shutdown() {
88      this.isRunning = false;
89    }
90    
91    public void start() {
92      
93      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
94      if ( ! chukwaRootDir.endsWith("/") ) {
95        chukwaRootDir += "/";
96      }
97      log.info("chukwaRootDir:" + chukwaRootDir);
98      
99      String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
100     if ( ! postProcessDir.endsWith("/") ) {
101       postProcessDir += "/";
102     }
103     
104     String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
105     if ( ! chukwaRootReposDir.endsWith("/") ) {
106       chukwaRootReposDir += "/";
107     }
108  
109     String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
110     if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
111       chukwaPostProcessInErrorDir += "/";
112     }
113 
114     int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
115                                              DEFAULT_MAX_ERROR_COUNT);
116 
117     
118     dataSources = new HashMap<String, String>();
119     Path postProcessDirectory = new Path(postProcessDir);
120     while (isRunning) {
121       
122       if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
123         log.warn("==================\nToo many errors (" + errorCount +
124                  "), Bail out!\n==================");
125         DaemonWatcher.bailout(-1);
126       }
127 
128       try {
129         FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
130         List<String> directories = new ArrayList<String>();
131         for (FileStatus fstatus : demuxOutputDirs) {
132           directories.add(fstatus.getPath().getName());
133         }
134         
135         if (demuxOutputDirs.length == 0) {
136           try { Thread.sleep(10*1000);} catch(InterruptedException e) { /* do nothing*/}
137           continue;
138         }
139         
140         Collections.sort(directories);
141         
142         String directoryToBeProcessed = null;
143         long start = 0;
144         
145         for(String directory : directories) {
146           directoryToBeProcessed = postProcessDirectory + "/"+ directory;
147           
148           log.info("PostProcess Start, directory:" + directoryToBeProcessed);
149           start = System.currentTimeMillis();
150          
151           try {
152             if ( processDataLoaders(directoryToBeProcessed) == true) {
153               Path[] destFiles = movetoMainRepository(
154                 directoryToBeProcessed,chukwaRootReposDir);
155               if (destFiles != null && destFiles.length > 0) {
156                 deleteDirectory(directoryToBeProcessed);
157                 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
158                 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
159                 processPostMoveTriggers(destFiles);
160                 continue;
161               }
162               } else {
163                   log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed + ". Will try again.");
164                   if (errorCount > 3)
165                       moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
166                   else 
167                       errorCount++;
168                   continue;                
169               
170             }
171             
172             // if we are here it's because something bad happen during processing
173             log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
174             moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
175           } catch (Throwable e) {
176             log.warn("Error in processDemuxOutput:" ,e);
177           } 
178         }
179        
180       } catch (Throwable e) {
181         errorCount ++;
182         log.warn(e);
183         try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
184         catch (InterruptedException e1) {/*do nothing*/ }
185       }
186     }
187   }
188   
189   public boolean processDataLoaders(String directory) throws IOException {
190     long start = System.currentTimeMillis();
191     try {
192       String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
193       for(String dataLoaderName : classes) {
194         Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>) Class.forName(dataLoaderName);
195         java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
196             dl.getConstructor();
197         DataLoaderFactory dataloader = c.newInstance();
198         
199           //DataLoaderFactory dataLoader = (DataLoaderFactory) Class.
200           //    forName(dataLoaderName).getConstructor().newInstance();
201         log.info(dataLoaderName+" processing: "+directory);
202         StringBuilder dirSearch = new StringBuilder();
203         dirSearch.append(directory);
204         dirSearch.append("/*/*/*.evt");
205         Path demuxDir = new Path(dirSearch.toString());
206         FileStatus[] events = fs.globStatus(demuxDir);
207         dataloader.load(conf, fs, events);
208       }
209     } catch(Exception e) {
210       log.error(ExceptionUtil.getStackTrace(e));
211       return false;
212     }
213     log.info("loadData Duration:" + (System.currentTimeMillis() - start));
214     return true;
215   }
216 
217   public boolean processPostMoveTriggers(Path[] files) throws IOException {
218     long start = System.currentTimeMillis();
219     try {
220       String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
221       if (actions == null || actions.trim().length() == 0) {
222         return true;
223       }
224       log.info("PostProcess firing postMoveTriggers");
225 
226       String[] classes = actions.trim().split(",");
227       for(String actionName : classes) {
228         Class<? extends TriggerAction> actionClass =
229             (Class<? extends TriggerAction>) Class.forName(actionName);
230         java.lang.reflect.Constructor<? extends TriggerAction> c =
231             actionClass.getConstructor();
232         TriggerAction action = c.newInstance();
233 
234         log.info(actionName + " handling " + files.length + " events");
235 
236         //send the files that were just added benieth the repos/ dir.
237         FileStatus[] events = fs.listStatus(files);
238         action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
239       }
240     } catch(Exception e) {
241       log.error(ExceptionUtil.getStackTrace(e));
242       return false;
243     }
244     log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
245     return true;
246   }
247 
248   public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
249     long start = System.currentTimeMillis();
250     Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
251     log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
252     return destFiles;
253   }
254   
255   public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception {
256     Path inErrorDir = new Path(inErrorDirectory);
257     if (!fs.exists(inErrorDir)) {
258       fs.mkdirs(inErrorDir);
259     }
260     
261     if (inErrorDirectory.endsWith("/")) {
262       inErrorDirectory += "/";
263     }
264     String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
265     fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
266     log.warn("Error in postProcess  :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
267     return true;
268   }
269   
270   public boolean deleteDirectory(String directory) throws IOException {
271    return fs.delete(new Path(directory), true);
272   }
273   
274 }