1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21
22 import java.util.List;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.log4j.Logger;
26
27
28
29
30
31
32
33
34
35
36 class FileTailer extends Thread {
37 static Logger log = Logger.getLogger(FileTailer.class);
38
39 private List<LWFTAdaptor> adaptors;
40 private volatile boolean isRunning = true;
41
42
43
44
45
46 int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
47 int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
48
49 public static final int MAX_SAMPLE_PERIOD = 60 * 1000;
50
51 FileTailer(Configuration conf) {
52
53 SAMPLE_PERIOD_MS = conf.getInt(
54 "chukwaAgent.adaptor.context.switch.time",
55 DEFAULT_SAMPLE_PERIOD_MS);
56
57
58
59 adaptors = new CopyOnWriteArrayList<LWFTAdaptor>();
60
61 this.setDaemon(true);
62 start();
63 }
64
65
66 void startWatchingFile(LWFTAdaptor f) {
67 adaptors.add(f);
68 }
69
70
71 void stopWatchingFile(LWFTAdaptor f) {
72 adaptors.remove(f);
73 }
74
75 public void run() {
76 while (isRunning) {
77 try {
78 boolean shouldISleep = true;
79 long startTime = System.currentTimeMillis();
80 for (LWFTAdaptor f : adaptors) {
81 boolean hasMoreData = f.tailFile();
82 shouldISleep &= !hasMoreData;
83 }
84 long timeToReadFiles = System.currentTimeMillis() - startTime;
85 if(timeToReadFiles > MAX_SAMPLE_PERIOD)
86 log.warn("took " + timeToReadFiles + " ms to check all files being tailed");
87 if (timeToReadFiles < SAMPLE_PERIOD_MS || shouldISleep) {
88 Thread.sleep(SAMPLE_PERIOD_MS);
89 }
90 } catch (Throwable e) {
91 log.warn("Exception in FileTailer, while loop", e);
92 e.printStackTrace();
93 }
94 }
95 }
96
97 }