1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.archive;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.text.SimpleDateFormat;
25
26 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
28 import org.apache.hadoop.chukwa.util.DaemonWatcher;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.util.ToolRunner;
33 import org.apache.log4j.Logger;
34
35 public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
36 static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
37 static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
38
39 static final int ONE_HOUR = 60 * 60 * 1000;
40 static final int ONE_DAY = 24*ONE_HOUR;
41 static final int MAX_FILES = 500;
42
43 private static final int DEFAULT_MAX_ERROR_COUNT = 4;
44
45 protected ChukwaConfiguration conf = null;
46 protected FileSystem fs = null;
47 protected boolean isRunning = true;
48
49 public ChukwaArchiveManager() throws Exception {
50 conf = new ChukwaConfiguration();
51 init();
52 }
53
54 protected void init() throws IOException, URISyntaxException {
55 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
56 fs = FileSystem.get(new URI(fsName), conf);
57 }
58
59 public static void main(String[] args) throws Exception {
60 DaemonWatcher.createInstance("ArchiveManager");
61
62 ChukwaArchiveManager manager = new ChukwaArchiveManager();
63 manager.start();
64 }
65
66 public void shutdown() {
67 this.isRunning = false;
68 }
69
70 public void start() throws Exception {
71
72 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
73 if ( ! chukwaRootDir.endsWith("/") ) {
74 chukwaRootDir += "/";
75 }
76 log.info("chukwaRootDir:" + chukwaRootDir);
77
78 String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
79 if ( ! archiveRootDir.endsWith("/") ) {
80 archiveRootDir += "/";
81 }
82 log.info("archiveDir:" + archiveRootDir);
83 Path pArchiveRootDir = new Path(archiveRootDir);
84 setup(pArchiveRootDir);
85
86 String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
87
88 String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
89 String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
90 String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
91
92 int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
93 DEFAULT_MAX_ERROR_COUNT);
94
95 Path pDailyRawArchivesInput = new Path(archiveRootDir);
96 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
97 Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
98 Path pFinalArchiveOutput = new Path(finalArchiveOutput);
99
100
101 if (!archivesMRInputDir.endsWith("/")) {
102 archivesMRInputDir +="/";
103 }
104 setup( pArchivesRootProcessingDir );
105 setup( pDailyRawArchivesInput );
106 setup( pFinalArchiveOutput );
107
108 int errorCount = 0;
109
110 long lastRun = 0l;
111
112 while (isRunning) {
113 try {
114
115 if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
116 log.warn("==================\nToo many errors (" + errorCount +
117 "), Bail out!\n==================");
118 DaemonWatcher.bailout(-1);
119 }
120
121
122
123
124 if (fs.exists(pArchivesMRInputDir)) {
125 FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
126 if (days.length > 0) {
127 log.info("reprocessing current Archive input" + days[0].getPath());
128
129 runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);
130 errorCount = 0;
131 continue;
132 }
133 }
134
135
136 log.info("Raw Archive dir:" + pDailyRawArchivesInput);
137 long now = System.currentTimeMillis();
138 int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
139 FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
140
141 if (daysInRawArchiveDir.length == 0 ) {
142 log.debug( pDailyRawArchivesInput + " is empty, going to sleep for 1 minute");
143 Thread.sleep(1 * 60 * 1000);
144 continue;
145 }
146
147
148 if (daysInRawArchiveDir.length == 1 ) {
149 int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
150 long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);
151 if (workingDay == currentDay && now < nextRun) {
152 log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
153 Thread.sleep(30 * 60 * 1000);
154 continue;
155 }
156 }
157
158 String dayArchivesMRInputDir = null;
159 for (FileStatus fsDay : daysInRawArchiveDir) {
160 dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
161 processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
162 lastRun = now;
163 }
164
165 }catch (Throwable e) {
166 errorCount ++;
167 e.printStackTrace();
168 log.warn(e);
169 }
170
171 }
172
173 }
174
175 public void processDay(FileStatus fsDay, String archivesMRInputDir,
176 String archivesMROutputDir,String finalArchiveOutput) throws Exception {
177 FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
178 long now = System.currentTimeMillis();
179
180 int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
181 int workingDay = Integer.parseInt(fsDay.getPath().getName());
182
183 long oneHourAgo = now - ONE_HOUR;
184 if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
185 fs.delete(fsDay.getPath(),false);
186 log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
187 return;
188 }
189
190 int fileCount = 0;
191 for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
192 long modificationDate = fsDataSinkDir.getModificationTime();
193 if (modificationDate < oneHourAgo || workingDay < currentDay) {
194 log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
195 fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
196 moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
197
198 if (fileCount >= MAX_FILES) {
199 log.info("processDay, reach capacity");
200 runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);
201 fileCount = 0;
202 } else {
203 log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
204 }
205 }
206 }
207 }
208
209 public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
210 String finalArchiveOutput) throws Exception {
211 String[] args = new String[3];
212
213
214 args[0] = conf.get("archive.grouper","Stream");
215 args[1] = archivesMRInputDir + "*/*.done" ;
216 args[2] = archivesMROutputDir;
217
218 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
219 Path pArchivesMROutputDir = new Path(archivesMROutputDir);
220
221
222 if (fs.exists(pArchivesMROutputDir)) {
223 log.warn("Deleteing mroutput dir for archive ...");
224 fs.delete(pArchivesMROutputDir, true);
225 }
226
227 log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
228 int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
229 log.info("Archive result: " + res);
230 if (res != 0) {
231 throw new Exception("Archive result != 0");
232 }
233
234 if (!finalArchiveOutput.endsWith("/")) {
235 finalArchiveOutput +="/";
236 }
237 String day = pArchivesMRInputDir.getName();
238 finalArchiveOutput += day;
239 Path pDay = new Path(finalArchiveOutput);
240 setup(pDay);
241
242 finalArchiveOutput += "/archive_" + System.currentTimeMillis();
243 Path pFinalArchiveOutput = new Path(finalArchiveOutput);
244
245 log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
246
247 if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
248 log.info("deleting " + pArchivesMRInputDir);
249 fs.delete(pArchivesMRInputDir, true);
250 } else {
251 log.warn("move to final archive folder failed!");
252 }
253
254
255
256 }
257
258 public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
259 String archivesMRInputDir) throws IOException {
260
261 if (!archivesMRInputDir.endsWith("/")) {
262 archivesMRInputDir +="/";
263 }
264
265 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
266 setup(pArchivesMRInputDir);
267 fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
268 log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
269 }
270
271
272
273
274
275
276 protected void setup(Path directory) throws IOException {
277 if ( ! fs.exists(directory)) {
278 fs.mkdirs(directory);
279 }
280 }
281
282 }