1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29 import org.apache.hadoop.chukwa.util.NagiosHelper;
30 import org.apache.hadoop.chukwa.util.DaemonWatcher;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.fs.PathFilter;
35 import org.apache.hadoop.util.ToolRunner;
36 import org.apache.log4j.Logger;
37
38 public class DemuxManager implements CHUKWA_CONSTANT {
39 static Logger log = Logger.getLogger(DemuxManager.class);
40
41 static int globalErrorcounter = 0;
42 static Date firstErrorTime = null;
43
44 protected int ERROR_SLEEP_TIME = 60;
45 protected int NO_DATASINK_SLEEP_TIME = 20;
46
47 protected int DEFAULT_MAX_ERROR_COUNT = 6;
48 protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
49 protected int DEFAULT_REDUCER_COUNT = 8;
50
51 protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
52 protected int demuxReducerCount = 0;
53 protected ChukwaConfiguration conf = null;
54 protected FileSystem fs = null;
55 protected int reprocess = 0;
56 protected boolean sendAlert = true;
57
58 protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
59 protected volatile boolean isRunning = true;
60
61 final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
62 public boolean accept(Path file) {
63 return file.getName().endsWith(".done");
64 }
65 };
66
67
68 public static void main(String[] args) throws Exception {
69 DaemonWatcher.createInstance("DemuxManager");
70
71 DemuxManager manager = new DemuxManager();
72 manager.start();
73
74 }
75
76 public DemuxManager() throws Exception {
77 this.conf = new ChukwaConfiguration();
78 init();
79 }
80
81 public DemuxManager(ChukwaConfiguration conf) throws Exception {
82 this.conf = conf;
83 init();
84 }
85
86 protected void init() throws IOException, URISyntaxException {
87 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
88 fs = FileSystem.get(new URI(fsName), conf);
89 }
90
91 public void shutdown() {
92 this.isRunning = false;
93 }
94
95
96 public int getReprocess() {
97 return reprocess;
98 }
99
100
101
102
103
104 public void start() throws Exception {
105
106 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
107 if ( ! chukwaRootDir.endsWith("/") ) {
108 chukwaRootDir += "/";
109 }
110 log.info("chukwaRootDir:" + chukwaRootDir);
111
112 String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
113 String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
114 String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
115 String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
116
117 String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
118 if ( ! dataSinkDir.endsWith("/") ) {
119 dataSinkDir += "/";
120 }
121 log.info("dataSinkDir:" + dataSinkDir);
122
123 String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
124 if ( ! postProcessDir.endsWith("/") ) {
125 postProcessDir += "/";
126 }
127 log.info("postProcessDir:" + postProcessDir);
128
129 String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
130 if ( ! archiveRootDir.endsWith("/") ) {
131 archiveRootDir += "/";
132 }
133 log.info("archiveRootDir:" + archiveRootDir);
134
135 maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
136 DEFAULT_MAX_ERROR_COUNT);
137 demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
138 log.info("demuxReducerCount:" + demuxReducerCount);
139
140 String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
141 int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
142 String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
143
144 log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:"
145 + nagiosPort + ", reportingHost:" + reportingHost);
146
147
148 if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
149 sendAlert = false;
150 log.warn("Alerting is OFF");
151 }
152
153 boolean demuxReady = false;
154
155
156 while (isRunning) {
157 try {
158 demuxReady = false;
159
160 if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
161 log.warn("==================\nToo many errors (" + globalErrorcounter +
162 "), Bail out!\n==================");
163 DaemonWatcher.bailout(-1);
164 }
165
166
167 if (checkDemuxOutputDir(demuxOutputDir) == true) {
168
169 if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
170 log.warn("Cannot delete an existing demux output directory!");
171 throw new IOException("Cannot move demuxOutput to postProcess!");
172 }
173 continue;
174 } else if (checkDemuxInputDir(demuxInputDir) == true) {
175 reprocess++;
176
177
178 if (reprocess > 3) {
179 if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
180 log.warn("Cannot move dataSink files to DemuxErrorDir!");
181 throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
182 }
183 reprocess = 0;
184 continue;
185 }
186
187 log.error("Demux inputDir aready contains some dataSink files,"
188 + " going to reprocess, reprocessCount=" + reprocess);
189 demuxReady = true;
190 } else {
191 reprocess = 0;
192
193 if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
194 demuxReady = true;
195 } else {
196 demuxReady = false;
197 }
198 }
199
200
201 if (demuxReady == true) {
202 boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
203 postProcessDir, archiveRootDir);
204 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
205
206
207 if (demuxStatus) {
208 globalErrorcounter = 0;
209 firstErrorTime = null;
210 }
211 } else {
212 log.info("Demux not ready so going to sleep ...");
213 Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
214 }
215 }catch(Throwable e) {
216 globalErrorcounter ++;
217 if (firstErrorTime == null) firstErrorTime = new Date();
218
219 log.warn("Consecutive error number " + globalErrorcounter +
220 " encountered since " + firstErrorTime, e);
221 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
222 try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
223 catch (InterruptedException e1) {
224 init();
225 }
226 }
227 }
228
229
230
231
232
233
234
235
236
237
238
239 protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
240 String demuxInErrorDir,boolean demuxStatus,String demuxException) {
241
242 if (sendAlert == false) {
243 return;
244 }
245
246 boolean demuxInErrorStatus = true;
247 String demuxInErrorMsg = "";
248 try {
249 Path pDemuxInErrorDir = new Path(demuxInErrorDir);
250 if ( fs.exists(pDemuxInErrorDir)) {
251 FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
252 if (demuxInErrorDirs.length == 0) {
253 demuxInErrorStatus = false;
254 }
255 }
256 } catch (Throwable e) {
257 demuxInErrorMsg = e.getMessage();
258 log.warn(e);
259 }
260
261
262 if (demuxStatus == true) {
263 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux OK",NagiosHelper.NAGIOS_OK);
264 } else {
265 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
266 }
267
268
269 if (demuxInErrorStatus == false) {
270 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError OK",NagiosHelper.NAGIOS_OK);
271 } else {
272 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
273 }
274
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291 protected boolean processData(String dataSinkDir, String demuxInputDir,
292 String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
293
294 boolean demuxStatus = false;
295
296 long startTime = System.currentTimeMillis();
297 demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
298 log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
299
300 if (demuxStatus == false) {
301 log.warn("Demux failed!");
302 } else {
303
304
305 if (checkDemuxOutputDir(demuxOutputDir)) {
306 if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
307 log.warn("Cannot move demuxOutput to postProcess! bail out!");
308 throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
309 }
310 } else {
311 log.warn("Demux processing OK but no output");
312 }
313
314
315 if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
316 log.warn("Cannot move datasinkFile to archive! bail out!");
317 throw new IOException("Cannot move datasinkFile to archive! bail out!");
318 }
319 }
320
321 return demuxStatus;
322 }
323
324
325
326
327
328
329
330
331 protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
332 String[] demuxParams;
333 int i=0;
334 Demux.addParsers(conf);
335 demuxParams = new String[4];
336 demuxParams[i++] = "-r";
337 demuxParams[i++] = "" + demuxReducerCount;
338 demuxParams[i++] = demuxInputDir;
339 demuxParams[i++] = demuxOutputDir;
340 try {
341 return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
342 } catch (Throwable e) {
343 e.printStackTrace();
344 globalErrorcounter ++;
345 if (firstErrorTime == null) firstErrorTime = new Date();
346 log.error("Failed to run demux. Consecutive error number " +
347 globalErrorcounter + " encountered since " + firstErrorTime, e);
348 }
349 return false;
350 }
351
352
353
354
355
356
357
358
359
360
361 protected boolean moveDataSinkFilesToDemuxInputDirectory(
362 String dataSinkDir, String demuxInputDir) throws IOException {
363 Path pDataSinkDir = new Path(dataSinkDir);
364 Path pDemuxInputDir = new Path(demuxInputDir);
365 log.info("dataSinkDir: " + dataSinkDir);
366 log.info("demuxInputDir: " + demuxInputDir);
367
368
369 boolean containsFile = false;
370
371 FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
372 if (dataSinkFiles.length > 0) {
373 setup(pDemuxInputDir);
374 }
375
376 int maxFilesPerDemux = 0;
377 for (FileStatus fstatus : dataSinkFiles) {
378 boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
379 log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
380 maxFilesPerDemux ++;
381 containsFile = true;
382 if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
383 log.info("Max File per Demux reached:" + maxFilesPerDemux);
384 break;
385 }
386 }
387 return containsFile;
388 }
389
390
391
392
393
394
395
396
397
398
399
400 protected boolean moveDataSinkFilesToDemuxErrorDirectory(
401 String dataSinkDir, String demuxErrorDir) throws IOException {
402 demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
403 return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
404 }
405
406
407
408
409
410
411
412
413 protected boolean moveDataSinkFilesToArchiveDirectory(
414 String demuxInputDir, String archiveDirectory) throws IOException {
415 archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
416 return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
417 }
418
419
420
421
422
423
424
425
426 protected boolean moveDemuxOutputDirToPostProcessDirectory(
427 String demuxOutputDir, String postProcessDirectory) throws IOException {
428 return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
429 }
430
431
432
433
434
435
436
437
438 protected boolean checkDemuxInputDir(String demuxInputDir)
439 throws IOException {
440 return dirExists(demuxInputDir);
441 }
442
443
444
445
446
447
448
449 protected boolean checkDemuxOutputDir(String demuxOutputDir)
450 throws IOException {
451 return dirExists(demuxOutputDir);
452 }
453
454
455
456
457
458
459
460
461 protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
462 {
463 return fs.delete(new Path(demuxOutputDir), true);
464 }
465
466
467
468
469
470
471 protected void setup(Path directory) throws IOException {
472 if ( ! fs.exists(directory)) {
473 fs.mkdirs(directory);
474 }
475 }
476
477
478
479
480
481 protected boolean dirExists(String directory) throws IOException {
482 Path pDirectory = new Path(directory);
483 return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
484 }
485
486
487
488
489
490
491
492
493 protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
494 if (!destDir.endsWith("/")) {
495 destDir +="/";
496 }
497 Path pSrcDir = new Path(srcDir);
498 Path pDestDir = new Path(destDir );
499 setup(pDestDir);
500 destDir += prefix +"_" +System.currentTimeMillis();
501 Path pFinalDestDir = new Path(destDir );
502
503 return fs.rename(pSrcDir, pFinalDestDir);
504 }
505 }