1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.HashSet;
28
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.FileUtil;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.log4j.Logger;
35
36
37
38
39
40 public class MoveToRepository {
41 static Logger log = Logger.getLogger(MoveToRepository.class);
42
43 static ChukwaConfiguration conf = null;
44 static FileSystem fs = null;
45 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
46 static Calendar calendar = Calendar.getInstance();
47
48 static Collection<Path> processClusterDirectory(Path srcDir, String destDir)
49 throws Exception {
50 log.info("processClusterDirectory (" + srcDir.getName() + "," + destDir
51 + ")");
52 FileStatus fstat = fs.getFileStatus(srcDir);
53 Collection<Path> destFiles = new HashSet<Path>();
54
55 if (!fstat.isDir()) {
56 throw new IOException(srcDir + " is not a directory!");
57 } else {
58 FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
59
60 for (FileStatus datasourceDirectory : datasourceDirectories) {
61 log.info(datasourceDirectory.getPath() + " isDir?"
62 + datasourceDirectory.isDir());
63 if (!datasourceDirectory.isDir()) {
64 throw new IOException(
65 "Top level datasource directory should be a directory :"
66 + datasourceDirectory.getPath());
67 }
68
69 String dirName = datasourceDirectory.getPath().getName();
70 Path destPath = new Path(destDir + "/" + dirName);
71 log.info("dest directory path: " + destPath);
72 log.info("processClusterDirectory processing Datasource: (" + dirName
73 + ")");
74 destFiles.addAll(processDatasourceDirectory(srcDir.getName(),
75 datasourceDirectory.getPath(), destDir + "/" + dirName));
76 }
77 }
78 return destFiles;
79 }
80
81 static Collection<Path> processDatasourceDirectory(String cluster, Path srcDir,
82 String destDir) throws Exception {
83 Collection<Path> destFiles = new HashSet<Path>();
84 String fileName = null;
85 int fileDay = 0;
86 int fileHour = 0;
87 int fileMin = 0;
88
89 FileStatus[] recordFiles = fs.listStatus(srcDir);
90 for (FileStatus recordFile : recordFiles) {
91
92
93
94 fileName = recordFile.getPath().getName();
95 log.info("processDatasourceDirectory processing RecordFile: (" + fileName
96 + ")");
97 log.info("fileName: " + fileName);
98
99 int l = fileName.length();
100 String dataSource = srcDir.getName();
101 log.info("Datasource: " + dataSource);
102
103 if (fileName.endsWith(".D.evt")) {
104
105
106 fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
107 Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
108 recordFile.getPath(), dataSource + "_" + fileDay);
109 if (destFile != null) {
110 destFiles.add(destFile);
111 }
112 } else if (fileName.endsWith(".H.evt")) {
113
114
115
116 String day = null;
117 String hour = null;
118 if (fileName.charAt(l - 8) == '_') {
119 day = fileName.substring(l - 16, l - 8);
120 log.info("day->" + day);
121 hour = "" + fileName.charAt(l - 7);
122 log.info("hour->" + hour);
123 } else {
124 day = fileName.substring(l - 17, l - 9);
125 log.info("day->" + day);
126 hour = fileName.substring(l - 8, l - 6);
127 log.info("hour->" + hour);
128 }
129 fileDay = Integer.parseInt(day);
130 fileHour = Integer.parseInt(hour);
131
132 Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
133 recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
134 if (destFile != null) {
135 destFiles.add(destFile);
136 }
137
138 addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
139 } else if (fileName.endsWith(".R.evt")) {
140 if (fileName.charAt(l - 11) == '_') {
141 fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
142 fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
143 fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
144 } else {
145 fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
146 fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
147 fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
148 }
149
150 log.info("fileDay: " + fileDay);
151 log.info("fileHour: " + fileHour);
152 log.info("fileMin: " + fileMin);
153 Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
154 + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
155 + fileHour + "_" + fileMin);
156 if (destFile != null) {
157 destFiles.add(destFile);
158 }
159
160 addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
161 } else {
162 throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
163 }
164 }
165
166 return destFiles;
167 }
168
169 static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
170 String cluster, String dataSource) throws IOException {
171
172 String rollingDirectory = "/chukwa/rolling/";
173
174 Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
175 + "/" + dataSource);
176 if (!fs.exists(path)) {
177 fs.mkdirs(path);
178 }
179
180 if (!isDailyOnly) {
181 path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
182 + cluster + "/" + dataSource);
183 if (!fs.exists(path)) {
184 fs.mkdirs(path);
185 }
186 }
187 }
188
189 static Path writeRecordFile(String destDir, Path recordFile, String fileName)
190 throws IOException {
191 boolean done = false;
192 int count = 1;
193 do {
194 Path destDirPath = new Path(destDir);
195 Path destFilePath = new Path(destDir + "/" + fileName + "." + count
196 + ".evt");
197
198 if (!fs.exists(destDirPath)) {
199 fs.mkdirs(destDirPath);
200 log.info(">>>>>>>>>>>> create Dir" + destDirPath);
201 }
202
203 if (!fs.exists(destFilePath)) {
204 log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
205 + destFilePath);
206 boolean rename = fs.rename(recordFile,destFilePath);
207 done = true;
208 log.info(">>>>>>>>>>>> after Rename" + destFilePath + " , rename:"+rename);
209 return destFilePath;
210 }
211 count++;
212
213 if (count > 1000) {
214 log.warn("too many files in this directory: " + destDir);
215 }
216 } while (!done);
217
218 return null;
219 }
220
221 static boolean checkRotate(String directoryAsString,
222 boolean createDirectoryIfNotExist) throws IOException {
223 Path directory = new Path(directoryAsString);
224 boolean exist = fs.exists(directory);
225
226 if (!exist) {
227 if (createDirectoryIfNotExist == true) {
228 fs.mkdirs(directory);
229 }
230 return false;
231 } else {
232 return fs.exists(new Path(directoryAsString + "/rotateDone"));
233 }
234 }
235
236 public static Path[] doMove(Path srcDir, String destDir) throws Exception {
237 conf = new ChukwaConfiguration();
238 String fsName = conf.get("writer.hdfs.filesystem");
239 fs = FileSystem.get(new URI(fsName), conf);
240 log.info("Start MoveToRepository doMove()");
241
242 FileStatus fstat = fs.getFileStatus(srcDir);
243
244 Collection<Path> destinationFiles = new HashSet<Path>();
245 if (!fstat.isDir()) {
246 throw new IOException(srcDir + " is not a directory!");
247 } else {
248 FileStatus[] clusters = fs.listStatus(srcDir);
249
250 String name = null;
251 for (FileStatus cluster : clusters) {
252 name = cluster.getPath().getName();
253
254 if (name.startsWith("_")) {
255 continue;
256 }
257 log.info("main procesing Cluster (" + cluster.getPath().getName() + ")");
258 destinationFiles.addAll(processClusterDirectory(cluster.getPath(),
259 destDir + "/" + cluster.getPath().getName()));
260
261
262 FileUtil.fullyDelete(fs, cluster.getPath());
263 }
264 }
265
266 log.info("Done with MoveToRepository doMove()");
267 return destinationFiles.toArray(new Path[destinationFiles.size()]);
268 }
269
270
271
272
273
274 public static void main(String[] args) throws Exception {
275
276 Path srcDir = new Path(args[0]);
277 String destDir = args[1];
278 doMove(srcDir, destDir);
279 }
280
281 }