View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20  
21  
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24  import org.apache.hadoop.chukwa.util.RecordConstants;
25  import java.util.ArrayList;
26  
27  /**
28   * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
29   * records at non-escaped carriage returns
30   * 
31   */
32  public class CharFileTailingAdaptorUTF8NewLineEscaped extends
33      FileTailingAdaptor {
34  
35    private static final char SEPARATOR = '\n';
36  
37    private ArrayList<Integer> offsets = new ArrayList<Integer>();
38  
39    /**
40     * 
41     * Note: this method uses a temporary ArrayList (shared across instances).
42     * This means we're copying ints each time. This could be a performance issue.
43     * Also, 'offsets' never shrinks, and will be of size proportional to the
44     * largest number of lines ever seen in an event.
45     */
46    @Override
47    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
48        byte[] buf) throws InterruptedException {
49      String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
50      for (int i = 0; i < buf.length; ++i) {
51        // if this is a separator
52        if (buf[i] == SEPARATOR) {
53          // if possibly preceded by escape sequence (avoid outOfBounds here)
54          boolean escaped = false; // was it escaped?
55          if (i - es.length() >= 0) {
56            escaped = true; // maybe (at least there was room for an escape
57                            // sequence, so let's check for the e.s.)
58            for (int j = 0; j < es.length(); j++) {
59              if (buf[i - es.length() + j] != es.charAt(j)) {
60                escaped = false;
61              }
62            }
63          }
64          if (!escaped) {
65            offsets.add(i);
66          }
67        }
68      }
69  
70      if (offsets.size() > 0) {
71        int[] offsets_i = new int[offsets.size()];
72        for (int i = 0; i < offsets_i.length; ++i)
73          offsets_i[i] = offsets.get(i);
74        // make the stream unique to this adaptor
75        int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
76                                                             // offset uses a byte
77        assert bytesUsed > 0 : " shouldn't send empty events";
78        ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
79            buffOffsetInFile + bytesUsed, buf, this);
80  
81        chunk.setSeqID(buffOffsetInFile + bytesUsed);
82        chunk.setRecordOffsets(offsets_i);
83        eq.add(chunk);
84  
85        offsets.clear();
86        return bytesUsed;
87      } else
88        return 0;
89    }
90  
91    public String toString() {
92      return "escaped newline CFTA-UTF8";
93    }
94  
95  }