View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20  
21  
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24  import java.util.ArrayList;
25  
26  /**
27   * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
28   * records at carriage returns.
29   * 
30   */
31  public class CharFileTailingAdaptorUTF8 extends FileTailingAdaptor {
32  
33    private static final char SEPARATOR = '\n';
34  
35    private ArrayList<Integer> offsets = new ArrayList<Integer>();
36  
37    /**
38     * 
39     * Note: this method uses a temporary ArrayList (shared across instances).
40     * This means we're copying ints each time. This could be a performance issue.
41     * Also, 'offsets' never shrinks, and will be of size proportional to the
42     * largest number of lines ever seen in an event.
43     */
44    @Override
45    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
46        byte[] buf) throws InterruptedException {
47      for (int i = 0; i < buf.length; ++i) {
48        if (buf[i] == SEPARATOR) {
49          offsets.add(i);
50        }
51      }
52  
53      if (offsets.size() > 0) {
54        int[] offsets_i = new int[offsets.size()];
55        for (int i = 0; i < offsets_i.length; ++i)
56          offsets_i[i] = offsets.get(i);
57  
58        int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
59                                                             // offset uses a byte
60        assert bytesUsed > 0 : " shouldn't send empty events";
61        ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),
62            buffOffsetInFile + bytesUsed, buf, this);
63  
64        event.setRecordOffsets(offsets_i);
65        eq.add(event);
66  
67        offsets.clear();
68        return bytesUsed;
69      } else
70        return 0;
71  
72    }
73  
74  }