1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
24 import org.apache.hadoop.chukwa.util.ExceptionUtil;
25
26
27
28
29
30
31
32 public class FileTailingAdaptor extends LWFTAdaptor {
33
34
35 public static int MAX_RETRIES = 300;
36 public static int GRACEFUL_PERIOD = 3 * 60 * 1000;
37
38 private int attempts = 0;
39 private long gracefulPeriodExpired = 0l;
40 private boolean adaptorInError = false;
41
42 protected RandomAccessFile reader = null;
43
44 public void start(long bytes) {
45 super.start(bytes);
46 log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
47 this.attempts = 0;
48
49 log.info("started file tailer " + adaptorID + " on file " + toWatch
50 + " with first byte at offset " + offsetOfFirstByte);
51 }
52
53
54 @Override
55 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
56
57 log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
58
59 switch(shutdownPolicy) {
60 case GRACEFULLY :
61 case WAIT_TILL_FINISHED :{
62 if (toWatch.exists()) {
63 int retry = 0;
64 tailer.stopWatchingFile(this);
65 TerminatorThread lastTail = new TerminatorThread(this);
66 lastTail.setDaemon(true);
67 lastTail.start();
68
69 if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
70 while (lastTail.isAlive() && retry < 60) {
71 try {
72 log.info("GRACEFULLY Retry:" + retry);
73 Thread.sleep(1000);
74 retry++;
75 } catch (InterruptedException ex) {
76 }
77 }
78 } else {
79 while (lastTail.isAlive()) {
80 try {
81 if (retry%100 == 0) {
82 log.info("WAIT_TILL_FINISHED Retry:" + retry);
83 }
84 Thread.sleep(1000);
85 retry++;
86 } catch (InterruptedException ex) {
87 }
88 }
89 }
90 }
91 }
92 break;
93
94 case HARD_STOP:
95 default:
96 tailer.stopWatchingFile(this);
97 try {
98 if (reader != null) {
99 reader.close();
100 }
101 reader = null;
102 } catch(Throwable e) {
103 log.warn("Exception while closing reader:",e);
104 }
105 break;
106 }
107 log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
108 return fileReadOffset + offsetOfFirstByte;
109 }
110
111
112
113
114
115
116
117
118
119 @Override
120 public synchronized boolean tailFile()
121 throws InterruptedException {
122 boolean hasMoreData = false;
123
124 try {
125 if ((adaptorInError == true)
126 && (System.currentTimeMillis() > gracefulPeriodExpired)) {
127 if (!toWatch.exists()) {
128 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
129 + "| File does not exist: " + toWatch.getAbsolutePath()
130 + ", streaming policy expired. File removed from streaming.");
131 } else if (!toWatch.canRead()) {
132 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
133 + "| File cannot be read: " + toWatch.getAbsolutePath()
134 + ", streaming policy expired. File removed from streaming.");
135 } else {
136
137 adaptorInError = false;
138 gracefulPeriodExpired = 0L;
139 attempts = 0;
140 return false;
141 }
142
143 deregisterAndStop();
144 return false;
145 } else if (!toWatch.exists() || !toWatch.canRead()) {
146 if (adaptorInError == false) {
147 long now = System.currentTimeMillis();
148 gracefulPeriodExpired = now + GRACEFUL_PERIOD;
149 adaptorInError = true;
150 attempts = 0;
151 log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
152 + ", graceful period will Expire at now:" + now + " + "
153 + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
154 } else if (attempts % 10 == 0) {
155 log.info("failed to stream data for: " + toWatch.getAbsolutePath()
156 + ", attempt: " + attempts);
157 }
158
159 attempts++;
160 return false;
161 }
162
163 if (reader == null) {
164 reader = new RandomAccessFile(toWatch, "r");
165 log.info("Adaptor|" + adaptorID
166 + "|Opening the file for the first time|seek|" + fileReadOffset);
167 }
168
169 long len = 0L;
170 try {
171 RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
172 len = reader.length();
173 long newLength = newReader.length();
174 if (newLength < len && fileReadOffset >= len) {
175 if (reader != null) {
176 reader.close();
177 }
178
179 reader = newReader;
180 fileReadOffset = 0L;
181 log.debug("Adaptor|"+ adaptorID + "| File size mismatched, rotating: "
182 + toWatch.getAbsolutePath());
183 } else {
184 try {
185 if (newReader != null) {
186 newReader.close();
187 }
188 newReader =null;
189 } catch (Throwable e) {
190 log.debug(ExceptionUtil.getStackTrace(e));
191 }
192 }
193 } catch (IOException e) {
194 log.debug(ExceptionUtil.getStackTrace(e));
195 }
196 if (len >= fileReadOffset) {
197 if (offsetOfFirstByte > fileReadOffset) {
198
199
200
201 fileReadOffset = 0;
202 offsetOfFirstByte = 0L;
203 log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
204 }
205 hasMoreData = slurp(len, reader);
206
207 } else {
208
209 if (reader != null) {
210 reader.close();
211 }
212
213 reader = null;
214 fileReadOffset = 0L;
215 offsetOfFirstByte = 0L;
216 hasMoreData = true;
217 log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
218 + ", has rotated and no detection - reset counters to 0L");
219 }
220 } catch (IOException e) {
221 log.warn("failure reading " + toWatch, e);
222 }
223 attempts = 0;
224 adaptorInError = false;
225 return hasMoreData;
226 }
227
228
229 }