View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.DaemonWatcher;
32  import org.apache.hadoop.conf.Configured;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FileUtil;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.mapred.FileInputFormat;
38  import org.apache.hadoop.mapred.FileOutputFormat;
39  import org.apache.hadoop.mapred.JobClient;
40  import org.apache.hadoop.mapred.JobConf;
41  import org.apache.hadoop.mapred.JobPriority;
42  import org.apache.hadoop.mapred.SequenceFileInputFormat;
43  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
44  import org.apache.hadoop.mapred.lib.IdentityMapper;
45  import org.apache.hadoop.mapred.lib.IdentityReducer;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.log4j.Logger;
48  
49  // TODO do an abstract class for all rolling 
50  public class HourlyChukwaRecordRolling extends Configured implements Tool {
51    static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
52  
53    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
54    static ChukwaConfiguration conf = null;
55    static FileSystem fs = null;
56    static final String HadoopLogDir = "_logs";
57    static final String hadoopTempDir = "_temporary";
58  
59    static boolean rollInSequence = true;
60    static boolean deleteRawdata = false;
61  
62    public static void usage() {
63      System.err
64          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
65      System.exit(-1);
66    }
67  
68    public static void buildHourlyFiles(String chukwaMainRepository,
69        String tempDir, String rollingFolder, int workingDay, int workingHour)
70        throws IOException {
71      // process
72      Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
73          + workingHour);
74      FileStatus[] clustersFS = fs.listStatus(hourPath);
75      for (FileStatus clusterFs : clustersFS) {
76        String cluster = clusterFs.getPath().getName();
77  
78        Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
79            + workingDay + "/" + workingHour + "/" + cluster);
80        FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
81        
82        for (FileStatus dataSourceFS : dataSourcesFS) {
83          String dataSource = dataSourceFS.getPath().getName();
84          // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
85  
86          // put the rotate flag
87          fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
88                  + dataSource + "/" + workingDay + "/" + workingHour
89                  + "/rotateDone"));
90  
91          // rotate
92          // Merge
93          String[] mergeArgs = new String[5];
94          // input
95          mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
96              + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
97          // temp dir
98          mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
99              + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
100         // final output dir
101         mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
102             + "/" + workingDay + "/" + workingHour;
103         // final output fileName
104         mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
105         // delete rolling directory
106         mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
107             + workingHour + "/" + cluster + "/" + dataSource;
108 
109         log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
110         log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
111         log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
112         log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
113         log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
114 
115         RecordMerger merge = new RecordMerger(conf, fs,
116             new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
117         List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
118         if (rollInSequence) {
119           merge.run();
120         } else {
121           allMerge.add(merge);
122           merge.start();
123         }
124 
125         // join all Threads
126         if (!rollInSequence) {
127           while (allMerge.size() > 0) {
128             RecordMerger m = allMerge.remove(0);
129             try {
130               m.join();
131             } catch (InterruptedException e) {
132             }
133           }
134         } // End if (!rollInSequence)
135 
136         // Delete the processed dataSourceFS
137         FileUtil.fullyDelete(fs, dataSourceFS.getPath());
138 
139       } // End for(FileStatus dataSourceFS : dataSourcesFS)
140 
141       // Delete the processed clusterFs
142       FileUtil.fullyDelete(fs, clusterFs.getPath());
143 
144     } // End for(FileStatus clusterFs : clustersFS)
145 
146     // Delete the processed hour
147     FileUtil.fullyDelete(fs, hourPath);
148   }
149 
150   /**
151    * @param args
152    * @throws Exception
153    */
154   public static void main(String[] args) throws Exception {
155     DaemonWatcher.createInstance("HourlyChukwaRecordRolling");
156     
157     conf = new ChukwaConfiguration();
158     String fsName = conf.get("writer.hdfs.filesystem");
159     fs = FileSystem.get(new URI(fsName), conf);
160 
161     // TODO read from config
162     String rollingFolder = "/chukwa/rolling/";
163     String chukwaMainRepository = "/chukwa/repos/";
164     String tempDir = "/chukwa/temp/hourlyRolling/";
165 
166     // TODO do a real parameter parsing
167     if (args.length != 4) {
168       usage();
169     }
170 
171     if (!args[0].equalsIgnoreCase("rollInSequence")) {
172       usage();
173     }
174 
175     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
176       usage();
177     }
178 
179     if (args[1].equalsIgnoreCase("true")) {
180       rollInSequence = true;
181     } else {
182       rollInSequence = false;
183     }
184 
185     if (args[3].equalsIgnoreCase("true")) {
186       deleteRawdata = true;
187     } else {
188       deleteRawdata = false;
189     }
190 
191     Calendar calendar = Calendar.getInstance();
192     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
193     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
194     log.info("CurrentDay: " + currentDay);
195     log.info("currentHour" + currentHour);
196 
197     Path rootFolder = new Path(rollingFolder + "/hourly/");
198 
199     FileStatus[] daysFS = fs.listStatus(rootFolder);
200     for (FileStatus dayFS : daysFS) {
201       try {
202         log.info("dayFs:" + dayFS.getPath().getName());
203         int workingDay = Integer.parseInt(dayFS.getPath().getName());
204 
205         Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
206         FileStatus[] hoursFS = fs.listStatus(hourlySrc);
207         for (FileStatus hourFS : hoursFS) {
208           String workinhHourStr = hourFS.getPath().getName();
209           int workingHour = Integer.parseInt(workinhHourStr);
210           if ((workingDay < currentDay) || // all previous days
211               ((workingDay == currentDay) && (workingHour < currentHour)) // Up
212                                                                           // to
213                                                                           // the
214                                                                           // last
215                                                                           // hour
216           ) {
217 
218             try {
219               buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
220                   workingDay, workingHour);
221             } catch(Throwable e) {
222               e.printStackTrace();
223               log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
224             }
225 
226           } // End if ( (workingDay < currentDay) || ( (workingDay ==
227             // currentDay) && (intHour < currentHour) ) )
228         } // End for(FileStatus hourFS : hoursFS)
229       } // End Try workingDay =
230         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
231       catch (NumberFormatException e) { /* Not a standard Day directory skip */
232         log.warn("Exception in hourlyRolling:", e);
233       }
234 
235     } // for(FileStatus dayFS : daysFS)
236   }
237 
238   public int run(String[] args) throws Exception {
239     JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
240 
241     conf.setJobName("HourlyChukwa-Rolling");
242     conf.setInputFormat(SequenceFileInputFormat.class);
243 
244     conf.setMapperClass(IdentityMapper.class);
245     conf.setReducerClass(IdentityReducer.class);
246 
247     conf.setOutputKeyClass(ChukwaRecordKey.class);
248     conf.setOutputValueClass(ChukwaRecord.class);
249     conf.setOutputFormat(SequenceFileOutputFormat.class);
250 
251     log.info("HourlyChukwaRecordRolling input: " + args[0]);
252     log.info("HourlyChukwaRecordRolling output: " + args[1]);
253 
254     FileInputFormat.setInputPaths(conf, args[0]);
255     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
256     conf.setJobPriority(JobPriority.LOW);
257     conf.setNumReduceTasks(1);
258     
259     JobClient.runJob(conf);
260     return 0;
261   }
262 
263 }