View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.DaemonWatcher;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FileUtil;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.mapred.FileInputFormat;
39  import org.apache.hadoop.mapred.FileOutputFormat;
40  import org.apache.hadoop.mapred.JobClient;
41  import org.apache.hadoop.mapred.JobConf;
42  import org.apache.hadoop.mapred.JobPriority;
43  import org.apache.hadoop.mapred.SequenceFileInputFormat;
44  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45  import org.apache.hadoop.mapred.lib.IdentityMapper;
46  import org.apache.hadoop.mapred.lib.IdentityReducer;
47  import org.apache.hadoop.util.Tool;
48  import org.apache.log4j.Logger;
49  
50  // TODO do an abstract class for all rolling 
51  public class DailyChukwaRecordRolling extends Configured implements Tool {
52    static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
53  
54    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
55    static ChukwaConfiguration conf = null;
56    static FileSystem fs = null;
57    static final String HadoopLogDir = "_logs";
58    static final String hadoopTempDir = "_temporary";
59  
60    static boolean rollInSequence = true;
61    static boolean deleteRawdata = false;
62  
63    public static void usage() {
64      System.err
65          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
66      System.exit(-1);
67    }
68  
69    public static boolean hourlyRolling(String dailyStreamDirectory) {
70     
71      Path pHour = null;
72      try {
73        log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
74        
75        for (int i=0;i<24;i++) {
76          pHour = new Path(dailyStreamDirectory + "/" + i);
77          if (! fs.exists(pHour)) {
78            log.info("HourlyData is missing for:" + pHour);
79            continue;
80          } else {
81            FileStatus[] files = fs.listStatus(pHour);
82            boolean containsHourly = false;
83            for(FileStatus file: files) {
84              log.info("Debug checking" + file.getPath());
85              if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
86                containsHourly = true;
87                break;
88              }
89            }
90            if (containsHourly == false) {
91              log.info("HourlyDone is missing for : " + pHour);
92              return false;
93            }
94          }
95        }
96        return true;
97      }catch(Exception e) {
98        e.printStackTrace();
99        return false;
100     }
101   }
102   public static void buildDailyFiles(String chukwaMainRepository,
103       String tempDir, String rollingFolder, int workingDay) throws IOException {
104     // process
105     
106     boolean alldone = true;
107     
108     Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
109     FileStatus[] clustersFS = fs.listStatus(dayPath);
110     for (FileStatus clusterFs : clustersFS) {
111       String cluster = clusterFs.getPath().getName();
112 
113       Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
114           + workingDay + "/" + cluster);
115       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
116       for (FileStatus dataSourceFS : dataSourcesFS) {
117         String dataSource = dataSourceFS.getPath().getName();
118         // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
119 
120 
121         // put the rotate flag
122         fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
123             + dataSource + "/" + workingDay + "/rotateDone"));
124         
125         if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
126           log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
127             + dataSource + "/" + workingDay );
128           alldone = false;
129           continue;
130         } 
131         
132         log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
133             + dataSource + "/" + workingDay + "/rotateDone");
134         
135         // rotate
136         // Merge
137         String[] mergeArgs = new String[5];
138         // input
139         mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
140             + "/" + workingDay + "/[0-9]*/*.evt";
141         // temp dir
142         mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
143             + workingDay + "_" + System.currentTimeMillis();
144         // final output dir
145         mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
146             + "/" + workingDay;
147         // final output fileName
148         mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
149         // delete rolling directory
150         mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
151             + "/" + dataSource;
152 
153         log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
154         log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
155         log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
156         log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
157         log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
158 
159         RecordMerger merge = new RecordMerger(conf, fs,
160             new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
161         List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
162         if (rollInSequence) {
163           merge.run();
164         } else {
165           allMerge.add(merge);
166           merge.start();
167         }
168 
169         // join all Threads
170         if (!rollInSequence) {
171           while (allMerge.size() > 0) {
172             RecordMerger m = allMerge.remove(0);
173             try {
174               m.join();
175             } catch (InterruptedException e) {
176             }
177           }
178         } // End if (!rollInSequence)
179 
180         // Delete the processed dataSourceFS
181           FileUtil.fullyDelete(fs, dataSourceFS.getPath());
182 
183       } // End for(FileStatus dataSourceFS : dataSourcesFS)
184 
185       // Delete the processed clusterFs
186       if (alldone == true) {
187         FileUtil.fullyDelete(fs, clusterFs.getPath());
188       }
189       
190 
191     } // End for(FileStatus clusterFs : clustersFS)
192 
193     // Delete the processed dayPath
194     if (alldone == true) {
195       FileUtil.fullyDelete(fs, dayPath);
196     }
197     
198   }
199 
200   /**
201    * @param args
202    * @throws Exception
203    */
204   public static void main(String[] args) throws Exception {
205     
206     DaemonWatcher.createInstance("DailyChukwaRecordRolling");
207     
208     conf = new ChukwaConfiguration();
209     String fsName = conf.get("writer.hdfs.filesystem");
210     fs = FileSystem.get(new URI(fsName), conf);
211 
212     // TODO read from config
213     String rollingFolder = "/chukwa/rolling/";
214     String chukwaMainRepository = "/chukwa/repos/";
215     String tempDir = "/chukwa/temp/dailyRolling/";
216 
217     // TODO do a real parameter parsing
218     if (args.length != 4) {
219       usage();
220     }
221 
222     if (!args[0].equalsIgnoreCase("rollInSequence")) {
223       usage();
224     }
225 
226     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
227       usage();
228     }
229 
230     if (args[1].equalsIgnoreCase("true")) {
231       rollInSequence = true;
232     } else {
233       rollInSequence = false;
234     }
235 
236     if (args[3].equalsIgnoreCase("true")) {
237       deleteRawdata = true;
238     } else {
239       deleteRawdata = false;
240     }
241 
242     log.info("rollInSequence: " + rollInSequence);
243     log.info("deleteRawdata: " + deleteRawdata);
244 
245     Calendar calendar = Calendar.getInstance();
246     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
247     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
248     log.info("CurrentDay: " + currentDay);
249     log.info("currentHour" + currentHour);
250 
251     Path rootFolder = new Path(rollingFolder + "/daily/");
252 
253     FileStatus[] daysFS = fs.listStatus(rootFolder);
254     for (FileStatus dayFS : daysFS) {
255       try {
256         int workingDay = Integer.parseInt(dayFS.getPath().getName());
257         log.info("Daily working on :" + workingDay);
258         if (workingDay < currentDay) {
259           
260           try {
261             buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
262                 workingDay);
263           } catch(Throwable e) {
264             e.printStackTrace();
265             log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay  ) ;
266           }
267           
268         } // End if ( workingDay < currentDay)
269       } // End Try workingDay =
270         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
271       catch (NumberFormatException e) { /* Not a standard Day directory skip */
272         log.debug(ExceptionUtil.getStackTrace(e));
273       }
274 
275     } // for(FileStatus dayFS : daysFS)
276   }
277 
278   public int run(String[] args) throws Exception {
279     JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
280 
281     conf.setJobName("DailyChukwa-Rolling");
282     conf.setInputFormat(SequenceFileInputFormat.class);
283 
284     conf.setMapperClass(IdentityMapper.class);
285     conf.setReducerClass(IdentityReducer.class);
286 
287     conf.setOutputKeyClass(ChukwaRecordKey.class);
288     conf.setOutputValueClass(ChukwaRecord.class);
289     conf.setOutputFormat(SequenceFileOutputFormat.class);
290 
291     log.info("DailyChukwaRecordRolling input: " + args[0]);
292     log.info("DailyChukwaRecordRolling output: " + args[1]);
293 
294     FileInputFormat.setInputPaths(conf, args[0]);
295     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
296     conf.setJobPriority(JobPriority.LOW);
297     conf.setNumReduceTasks(1);
298     JobClient.runJob(conf);
299     return 0;
300   }
301 
302 }