View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.Collection;
27  import java.util.HashSet;
28  
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.FileUtil;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.log4j.Logger;
35  
36  // TODO
37  // First version of the Spill
38  // need some polishing
39  
40  public class MoveToRepository {
41    static Logger log = Logger.getLogger(MoveToRepository.class);
42  
43    static ChukwaConfiguration conf = null;
44    static FileSystem fs = null;
45    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
46    static Calendar calendar = Calendar.getInstance();
47  
48    static Collection<Path> processClusterDirectory(Path srcDir, String destDir)
49        throws Exception {
50      log.info("processClusterDirectory (" + srcDir.getName() + "," + destDir
51          + ")");
52      FileStatus fstat = fs.getFileStatus(srcDir);
53      Collection<Path> destFiles = new HashSet<Path>();
54  
55      if (!fstat.isDir()) {
56        throw new IOException(srcDir + " is not a directory!");
57      } else {
58        FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
59  
60        for (FileStatus datasourceDirectory : datasourceDirectories) {
61          log.info(datasourceDirectory.getPath() + " isDir?"
62              + datasourceDirectory.isDir());
63          if (!datasourceDirectory.isDir()) {
64            throw new IOException(
65                "Top level datasource directory should be a directory :"
66                    + datasourceDirectory.getPath());
67          }
68  
69          String dirName = datasourceDirectory.getPath().getName();
70          Path destPath = new Path(destDir + "/" + dirName);
71          log.info("dest directory path: " + destPath);
72          log.info("processClusterDirectory processing Datasource: (" + dirName
73              + ")");
74          destFiles.addAll(processDatasourceDirectory(srcDir.getName(),
75              datasourceDirectory.getPath(), destDir + "/" + dirName));
76        }
77      }
78      return destFiles;
79    }
80  
81    static Collection<Path> processDatasourceDirectory(String cluster, Path srcDir,
82        String destDir) throws Exception {
83      Collection<Path> destFiles = new HashSet<Path>();
84      String fileName = null;
85      int fileDay = 0;
86      int fileHour = 0;
87      int fileMin = 0;
88  
89      FileStatus[] recordFiles = fs.listStatus(srcDir);
90      for (FileStatus recordFile : recordFiles) {
91        // dataSource_20080915_18_15.1.evt
92        // <datasource>_<yyyyMMdd_HH_mm>.1.evt
93  
94        fileName = recordFile.getPath().getName();
95        log.info("processDatasourceDirectory processing RecordFile: (" + fileName
96            + ")");
97        log.info("fileName: " + fileName);
98  
99        int l = fileName.length();
100       String dataSource = srcDir.getName();
101       log.info("Datasource: " + dataSource);
102 
103       if (fileName.endsWith(".D.evt")) {
104         // Hadoop_dfs_datanode_20080919.D.evt
105 
106         fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
107         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
108             recordFile.getPath(), dataSource + "_" + fileDay);
109         if (destFile != null) {
110           destFiles.add(destFile);
111         }
112       } else if (fileName.endsWith(".H.evt")) {
113         // Hadoop_dfs_datanode_20080925_1.H.evt
114         // Hadoop_dfs_datanode_20080925_12.H.evt
115 
116         String day = null;
117         String hour = null;
118         if (fileName.charAt(l - 8) == '_') {
119           day = fileName.substring(l - 16, l - 8);
120           log.info("day->" + day);
121           hour = "" + fileName.charAt(l - 7);
122           log.info("hour->" + hour);
123         } else {
124           day = fileName.substring(l - 17, l - 9);
125           log.info("day->" + day);
126           hour = fileName.substring(l - 8, l - 6);
127           log.info("hour->" + hour);
128         }
129         fileDay = Integer.parseInt(day);
130         fileHour = Integer.parseInt(hour);
131         // rotate there so spill
132         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
133             recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
134         if (destFile != null) {
135           destFiles.add(destFile);
136         }
137         // mark this directory for daily rotate
138         addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
139       } else if (fileName.endsWith(".R.evt")) {
140         if (fileName.charAt(l - 11) == '_') {
141           fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
142           fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
143           fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
144         } else {
145           fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
146           fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
147           fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
148         }
149 
150         log.info("fileDay: " + fileDay);
151         log.info("fileHour: " + fileHour);
152         log.info("fileMin: " + fileMin);
153         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
154             + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
155             + fileHour + "_" + fileMin);
156         if (destFile != null) {
157           destFiles.add(destFile);
158         }
159         // mark this directory for hourly rotate
160         addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
161       } else {
162         throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
163       }
164     }
165 
166     return destFiles;
167   }
168 
169   static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
170       String cluster, String dataSource) throws IOException {
171     // TODO get root directory from config
172     String rollingDirectory = "/chukwa/rolling/";
173 
174     Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
175         + "/" + dataSource);
176     if (!fs.exists(path)) {
177       fs.mkdirs(path);
178     }
179 
180     if (!isDailyOnly) {
181       path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
182           + cluster + "/" + dataSource);
183       if (!fs.exists(path)) {
184         fs.mkdirs(path);
185       }
186     }
187   }
188 
189   static Path writeRecordFile(String destDir, Path recordFile, String fileName)
190       throws IOException {
191     boolean done = false;
192     int count = 1;
193     do {
194       Path destDirPath = new Path(destDir);
195       Path destFilePath = new Path(destDir + "/" + fileName + "." + count
196           + ".evt");
197 
198       if (!fs.exists(destDirPath)) {
199         fs.mkdirs(destDirPath);
200         log.info(">>>>>>>>>>>> create Dir" + destDirPath);
201       }
202 
203       if (!fs.exists(destFilePath)) {
204         log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
205             + destFilePath);
206         boolean rename = fs.rename(recordFile,destFilePath);
207         done = true;
208         log.info(">>>>>>>>>>>> after Rename" + destFilePath + " , rename:"+rename);
209         return destFilePath;
210       } 
211       count++;
212 
213       if (count > 1000) {
214         log.warn("too many files in this directory: " + destDir);
215       }
216     } while (!done);
217 
218     return null;
219   }
220 
221   static boolean checkRotate(String directoryAsString,
222       boolean createDirectoryIfNotExist) throws IOException {
223     Path directory = new Path(directoryAsString);
224     boolean exist = fs.exists(directory);
225 
226     if (!exist) {
227       if (createDirectoryIfNotExist == true) {
228         fs.mkdirs(directory);
229       }
230       return false;
231     } else {
232       return fs.exists(new Path(directoryAsString + "/rotateDone"));
233     }
234   }
235 
236   public static Path[] doMove(Path srcDir, String destDir) throws Exception {
237     conf = new ChukwaConfiguration();
238     String fsName = conf.get("writer.hdfs.filesystem");
239     fs = FileSystem.get(new URI(fsName), conf);
240     log.info("Start MoveToRepository doMove()");
241 
242     FileStatus fstat = fs.getFileStatus(srcDir);
243 
244     Collection<Path> destinationFiles = new HashSet<Path>();
245     if (!fstat.isDir()) {
246       throw new IOException(srcDir + " is not a directory!");
247     } else {
248       FileStatus[] clusters = fs.listStatus(srcDir);
249       // Run a moveOrMerge on all clusters
250       String name = null;
251       for (FileStatus cluster : clusters) {
252         name = cluster.getPath().getName();
253         // Skip hadoop M/R outputDir
254         if (name.startsWith("_")) {
255           continue;
256         }
257         log.info("main procesing Cluster (" + cluster.getPath().getName() + ")");
258         destinationFiles.addAll(processClusterDirectory(cluster.getPath(),
259             destDir + "/" + cluster.getPath().getName()));
260 
261         // Delete the demux's cluster dir
262         FileUtil.fullyDelete(fs, cluster.getPath());
263       }
264     }
265 
266     log.info("Done with MoveToRepository doMove()");
267     return destinationFiles.toArray(new Path[destinationFiles.size()]);
268   }
269 
270   /**
271    * @param args
272    * @throws Exception
273    */
274   public static void main(String[] args) throws Exception {
275 
276     Path srcDir = new Path(args[0]);
277     String destDir = args[1];
278     doMove(srcDir, destDir);
279   }
280 
281 }