View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.util.regex.Matcher;
24  import java.util.regex.Pattern;
25  import org.apache.hadoop.chukwa.ChunkImpl;
26  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
27  import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
28  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
29  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.log4j.Logger;
32  
33  /**
34   * A base class for file tailing adaptors.  
35   * Intended to mandate as little policy as possible, and to use as 
36   * few system resources as possible.
37   * 
38   * 
39   * If the file does not exist, this class will continue to retry quietly
40   * forever and will start tailing if it's eventually created.
41   */
42  public class LWFTAdaptor extends AbstractAdaptor {
43    
44    /**
45     * This is the maximum amount we'll read from any one file before moving on to
46     * the next. This way, we get quick response time for other files if one file
47     * is growing rapidly.
48     * 
49     */
50    public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
51    public static final String MAX_READ_SIZE_OPT = 
52        "chukwaAgent.fileTailingAdaptor.maxReadSize";
53  
54    public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
55    
56    static Logger log;
57    protected static FileTailer tailer;
58    
59    static {
60      tailer = null;
61      log = Logger.getLogger(FileTailingAdaptor.class);
62    }
63    
64    
65    /**
66     * next PHYSICAL offset to read
67     */
68    protected long fileReadOffset;
69  
70    /**
71     * The logical offset of the first byte of the file
72     */
73    protected long offsetOfFirstByte = 0;
74    protected Configuration conf = null;
75    
76    File toWatch;
77  
78    @Override
79    public void start(long offset) {
80      synchronized(LWFTAdaptor.class) {
81        if(tailer == null)
82          tailer = new FileTailer(control.getConfiguration());
83      }
84      this.fileReadOffset = offset - offsetOfFirstByte;    
85      tailer.startWatchingFile(this);
86    }
87    
88    /**
89     * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
90     */
91    public String getCurrentStatus() {
92      return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
93    }
94  
95    public String toString() {
96      return "Lightweight Tailer on " + toWatch;
97    }
98  
99    public String getStreamName() {
100     return toWatch.getPath();
101   }
102   
103   @Override
104   public String parseArgs(String params) { 
105     conf = control.getConfiguration();
106     MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
107 
108     Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
109     Matcher m = cmd.matcher(params);
110     if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
111       offsetOfFirstByte = Long.parseLong(m.group(1));
112       toWatch = new File(m.group(2));
113     } else {
114       toWatch = new File(params.trim());
115     }
116     return toWatch.getAbsolutePath();
117   }
118 
119   @Override
120   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
121       throws AdaptorException {
122     tailer.stopWatchingFile(this);
123     return fileReadOffset + offsetOfFirstByte;
124   }
125   
126 
127   /**
128    * Extract records from a byte sequence
129    * 
130    * @param eq the queue to stick the new chunk[s] in
131    * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
132    * @param buf the byte buffer to extract records from
133    * @return the number of bytes processed
134    * @throws InterruptedException
135    */
136   protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
137       byte[] buf) throws InterruptedException {
138     if(buf.length == 0)
139       return 0;
140     
141     ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
142         buffOffsetInFile + buf.length, buf, this);
143 
144     eq.add(chunk);
145     return buf.length;
146   }
147   
148   protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
149   InterruptedException{
150     boolean hasMoreData = false;
151 
152     log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
153     reader.seek(fileReadOffset);
154 
155     long bufSize = len - fileReadOffset;
156 
157    if (bufSize > MAX_READ_SIZE) {
158       bufSize = MAX_READ_SIZE;
159       hasMoreData = true;
160     }
161     byte[] buf = new byte[(int) bufSize];
162 
163     long curOffset = fileReadOffset;
164 
165     int bufferRead = reader.read(buf);
166     assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
167         + " pointer is "
168         + reader.getFilePointer()
169         + " but offset is "
170         + fileReadOffset + bufSize;
171 
172     int bytesUsed = extractRecords(dest,
173         fileReadOffset + offsetOfFirstByte, buf);
174 
175     // === WARNING ===
176     // If we couldn't found a complete record AND
177     // we cannot read more, i.e bufferRead == MAX_READ_SIZE
178     // it's because the record is too BIG
179     // So log.warn, and drop current buffer so we can keep moving
180     // instead of being stopped at that point for ever
181     if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
182       log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="
183               + curOffset
184               + ", MAX_READ_SIZE="
185               + MAX_READ_SIZE
186               + ", for "
187               + toWatch.getPath());
188       bytesUsed = buf.length;
189     }
190 
191     fileReadOffset = fileReadOffset + bytesUsed;
192 
193     log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
194         + fileReadOffset);
195     return hasMoreData;
196   }
197   
198   public synchronized boolean tailFile()
199   throws InterruptedException {
200     boolean hasMoreData = false;
201     try {
202       
203        //if file doesn't exist, length =0 and we just keep waiting for it.
204       //if(!toWatch.exists())
205       //  deregisterAndStop(false);
206       
207       long len = toWatch.length();
208       if(len < fileReadOffset) {
209         //file shrank; probably some data went missing.
210         handleShrunkenFile(len);
211       } else if(len > fileReadOffset) {
212         RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
213         slurp(len, reader);
214         reader.close();
215       }
216     } catch(IOException e) {
217       log.warn("IOException in tailer", e);
218       deregisterAndStop();
219     }
220     
221     return hasMoreData;
222   }
223 
224   private void handleShrunkenFile(long measuredLen) {
225     log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
226     offsetOfFirstByte = measuredLen;
227     fileReadOffset = 0;
228   }
229 
230 }