1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24 import org.apache.hadoop.chukwa.util.RecordConstants;
25 import java.util.ArrayList;
26
27
28
29
30
31
32 public class CharFileTailingAdaptorUTF8NewLineEscaped extends
33 FileTailingAdaptor {
34
35 private static final char SEPARATOR = '\n';
36
37 private ArrayList<Integer> offsets = new ArrayList<Integer>();
38
39
40
41
42
43
44
45
46 @Override
47 protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
48 byte[] buf) throws InterruptedException {
49 String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
50 for (int i = 0; i < buf.length; ++i) {
51
52 if (buf[i] == SEPARATOR) {
53
54 boolean escaped = false;
55 if (i - es.length() >= 0) {
56 escaped = true;
57
58 for (int j = 0; j < es.length(); j++) {
59 if (buf[i - es.length() + j] != es.charAt(j)) {
60 escaped = false;
61 }
62 }
63 }
64 if (!escaped) {
65 offsets.add(i);
66 }
67 }
68 }
69
70 if (offsets.size() > 0) {
71 int[] offsets_i = new int[offsets.size()];
72 for (int i = 0; i < offsets_i.length; ++i)
73 offsets_i[i] = offsets.get(i);
74
75 int bytesUsed = offsets_i[offsets_i.length - 1] + 1;
76
77 assert bytesUsed > 0 : " shouldn't send empty events";
78 ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
79 buffOffsetInFile + bytesUsed, buf, this);
80
81 chunk.setSeqID(buffOffsetInFile + bytesUsed);
82 chunk.setRecordOffsets(offsets_i);
83 eq.add(chunk);
84
85 offsets.clear();
86 return bytesUsed;
87 } else
88 return 0;
89 }
90
91 public String toString() {
92 return "escaped newline CFTA-UTF8";
93 }
94
95 }