View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.archive;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.text.SimpleDateFormat;
25  
26  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
28  import org.apache.hadoop.chukwa.util.DaemonWatcher;
29  import org.apache.hadoop.fs.FileStatus;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.util.ToolRunner;
33  import org.apache.log4j.Logger;
34  
35  public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
36    static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
37    static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
38    
39    static final  int ONE_HOUR = 60 * 60 * 1000;
40    static final int ONE_DAY = 24*ONE_HOUR;
41    static final int MAX_FILES = 500;
42  
43    private static final int DEFAULT_MAX_ERROR_COUNT = 4;
44  
45    protected ChukwaConfiguration conf = null;
46    protected FileSystem fs = null;
47    protected boolean isRunning = true;
48    
49    public ChukwaArchiveManager() throws Exception { 
50      conf = new ChukwaConfiguration();
51      init();
52    }
53  
54    protected void init() throws IOException, URISyntaxException {
55      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
56      fs = FileSystem.get(new URI(fsName), conf);
57    }
58  
59    public static void main(String[] args) throws Exception {
60      DaemonWatcher.createInstance("ArchiveManager");
61      
62      ChukwaArchiveManager manager = new ChukwaArchiveManager();
63      manager.start();
64    }
65  
66    public void shutdown() {
67      this.isRunning = false;
68    }
69    
70    public void start() throws Exception {
71      
72      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
73      if ( ! chukwaRootDir.endsWith("/") ) {
74        chukwaRootDir += "/";
75      }
76      log.info("chukwaRootDir:" + chukwaRootDir);
77      
78      String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
79      if ( ! archiveRootDir.endsWith("/") ) {
80        archiveRootDir += "/";
81      }
82      log.info("archiveDir:" + archiveRootDir);
83      Path pArchiveRootDir = new Path(archiveRootDir);
84      setup(pArchiveRootDir);
85      
86      String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
87      // String archivesErrorDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME;
88      String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
89      String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
90      String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
91  
92      int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
93                                               DEFAULT_MAX_ERROR_COUNT);
94      
95      Path pDailyRawArchivesInput = new Path(archiveRootDir);
96      Path pArchivesMRInputDir = new Path(archivesMRInputDir);
97      Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
98      Path pFinalArchiveOutput =  new Path(finalArchiveOutput);
99      
100     
101     if (!archivesMRInputDir.endsWith("/")) {
102       archivesMRInputDir +="/";
103     }
104     setup( pArchivesRootProcessingDir );
105     setup( pDailyRawArchivesInput );
106     setup( pFinalArchiveOutput );
107     
108     int errorCount = 0;
109     
110     long lastRun = 0l;
111     
112     while (isRunning) {
113       try {
114         
115         if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
116           log.warn("==================\nToo many errors (" + errorCount +
117                    "), Bail out!\n==================");
118           DaemonWatcher.bailout(-1);
119         }
120         // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
121         //  to
122         // /chukwa/archives/final/<YYYYMMDD>_<TS>
123         
124         if (fs.exists(pArchivesMRInputDir)) {
125           FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
126           if (days.length > 0) {
127             log.info("reprocessing current Archive input" +  days[0].getPath());
128             
129             runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);  
130             errorCount = 0;
131             continue;
132           }
133         }
134         
135         
136         log.info("Raw Archive dir:" + pDailyRawArchivesInput);
137         long now = System.currentTimeMillis();
138         int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
139         FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
140         
141         if (daysInRawArchiveDir.length == 0 ) { 
142           log.debug( pDailyRawArchivesInput + " is empty, going to sleep for 1 minute"); 
143           Thread.sleep(1 * 60 * 1000); 
144           continue; 
145         } 
146         // We don't want to process DataSink file more than once every 2 hours
147         // for current day
148         if (daysInRawArchiveDir.length == 1 ) {
149           int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
150           long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);// 2h -1min
151           if (workingDay == currentDay && now < nextRun) {
152             log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
153             Thread.sleep(30 * 60 * 1000);
154             continue;
155           }
156         }
157         
158         String dayArchivesMRInputDir = null;
159         for (FileStatus fsDay : daysInRawArchiveDir) {
160           dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
161           processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
162           lastRun = now;
163         }
164         
165       }catch (Throwable e) {
166         errorCount ++;
167         e.printStackTrace();
168         log.warn(e);
169       }
170       
171     }
172     
173   }
174   
175   public void processDay(FileStatus fsDay, String archivesMRInputDir,
176       String archivesMROutputDir,String finalArchiveOutput) throws Exception {
177     FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
178     long now = System.currentTimeMillis();
179     
180     int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
181     int workingDay = Integer.parseInt(fsDay.getPath().getName());
182     
183     long oneHourAgo = now -  ONE_HOUR;
184     if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
185       fs.delete(fsDay.getPath(),false);
186       log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
187       return;
188     }
189     
190     int fileCount = 0;
191     for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
192       long modificationDate = fsDataSinkDir.getModificationTime();
193       if (modificationDate < oneHourAgo || workingDay < currentDay) {
194         log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
195         fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
196         moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
197         // process no more than MAX_FILES directories
198         if (fileCount >= MAX_FILES) {
199           log.info("processDay, reach capacity");
200           runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);  
201           fileCount = 0;
202         } else {
203           log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
204         }
205       }
206     }    
207   }
208   
209   public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
210       String finalArchiveOutput) throws Exception {
211     String[] args = new String[3];
212     
213     
214     args[0] = conf.get("archive.grouper","Stream");
215     args[1] = archivesMRInputDir + "*/*.done" ;
216     args[2] = archivesMROutputDir;
217     
218     Path pArchivesMRInputDir = new Path(archivesMRInputDir);
219     Path pArchivesMROutputDir = new Path(archivesMROutputDir);
220 
221     
222     if (fs.exists(pArchivesMROutputDir)) {
223       log.warn("Deleteing mroutput dir for archive ...");
224       fs.delete(pArchivesMROutputDir, true);
225     }
226     
227     log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
228     int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
229     log.info("Archive result: " + res);
230     if (res != 0) {
231       throw new Exception("Archive result != 0");
232     }
233    
234     if (!finalArchiveOutput.endsWith("/")) {
235       finalArchiveOutput +="/";
236     }
237     String day = pArchivesMRInputDir.getName();
238     finalArchiveOutput += day;
239     Path pDay = new Path(finalArchiveOutput);
240     setup(pDay);
241     
242     finalArchiveOutput += "/archive_" + System.currentTimeMillis();
243     Path pFinalArchiveOutput = new Path(finalArchiveOutput);
244     
245     log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
246     
247     if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
248       log.info("deleting " + pArchivesMRInputDir);
249       fs.delete(pArchivesMRInputDir, true);
250     } else {
251       log.warn("move to final archive folder failed!");
252     }
253     
254 
255     
256   }
257   
258   public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
259       String archivesMRInputDir) throws IOException {
260     
261     if (!archivesMRInputDir.endsWith("/")) {
262       archivesMRInputDir +="/";
263     }
264     
265     Path pArchivesMRInputDir = new Path(archivesMRInputDir);
266     setup(pArchivesMRInputDir);
267     fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
268     log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
269   }
270   
271   /**
272    * Create directory if !exists
273    * @param directory
274    * @throws IOException
275    */
276   protected void setup(Path directory) throws IOException {
277      if ( ! fs.exists(directory)) {
278        fs.mkdirs(directory);
279      }
280   }
281  
282 }