1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.file.rfile.bcfile;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 /**
26 * Several related classes to support chunk-encoded sub-streams on top of a regular stream.
27 */
28 final class Chunk {
29
30 /**
31 * Prevent the instantiation of class.
32 */
33 private Chunk() {
34
35 }
36
37 /**
38 * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder.
39 */
40 static public class ChunkDecoder extends InputStream {
41 private DataInputStream in = null;
42 private boolean lastChunk;
43 private int remain = 0;
44 private boolean closed;
45
46 public ChunkDecoder() {
47 lastChunk = true;
48 closed = true;
49 }
50
51 public void reset(DataInputStream downStream) {
52
53 in = downStream;
54 lastChunk = false;
55 remain = 0;
56 closed = false;
57 }
58
59 /**
60 * Constructor
61 *
62 * @param in
63 * The source input stream which contains chunk-encoded data stream.
64 */
65 public ChunkDecoder(DataInputStream in) {
66 this.in = in;
67 lastChunk = false;
68 closed = false;
69 }
70
71 /**
72 * Have we reached the last chunk.
73 *
74 * @return true if we have reached the last chunk.
75 * @throws java.io.IOException
76 */
77 public boolean isLastChunk() throws IOException {
78 checkEOF();
79 return lastChunk;
80 }
81
82 /**
83 * How many bytes remain in the current chunk?
84 *
85 * @return remaining bytes left in the current chunk.
86 * @throws java.io.IOException
87 */
88 public int getRemain() throws IOException {
89 checkEOF();
90 return remain;
91 }
92
93 /**
94 * Reading the length of next chunk.
95 *
96 * @throws java.io.IOException
97 * when no more data is available.
98 */
99 private void readLength() throws IOException {
100 remain = Utils.readVInt(in);
101 if (remain >= 0) {
102 lastChunk = true;
103 } else {
104 remain = -remain;
105 }
106 }
107
108 /**
109 * Check whether we reach the end of the stream.
110 *
111 * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise.
112 * @throws java.io.IOException
113 * on I/O errors.
114 */
115 private boolean checkEOF() throws IOException {
116 if (isClosed())
117 return true;
118 while (true) {
119 if (remain > 0)
120 return false;
121 if (lastChunk)
122 return true;
123 readLength();
124 }
125 }
126
127 @Override
128
129
130
131 public int available() {
132 return remain;
133 }
134
135 @Override
136 public int read() throws IOException {
137 if (checkEOF())
138 return -1;
139 int ret = in.read();
140 if (ret < 0)
141 throw new IOException("Corrupted chunk encoding stream");
142 --remain;
143 return ret;
144 }
145
146 @Override
147 public int read(byte[] b) throws IOException {
148 return read(b, 0, b.length);
149 }
150
151 @Override
152 public int read(byte[] b, int off, int len) throws IOException {
153 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
154 throw new IndexOutOfBoundsException();
155 }
156
157 if (!checkEOF()) {
158 int n = Math.min(remain, len);
159 int ret = in.read(b, off, n);
160 if (ret < 0)
161 throw new IOException("Corrupted chunk encoding stream");
162 remain -= ret;
163 return ret;
164 }
165 return -1;
166 }
167
168 @Override
169 public long skip(long n) throws IOException {
170 if (!checkEOF()) {
171 long ret = in.skip(Math.min(remain, n));
172 remain -= ret;
173 return ret;
174 }
175 return 0;
176 }
177
178 @Override
179 public boolean markSupported() {
180 return false;
181 }
182
183 public boolean isClosed() {
184 return closed;
185 }
186
187 @Override
188 public void close() throws IOException {
189 if (closed == false) {
190 try {
191 while (!checkEOF()) {
192 skip(Integer.MAX_VALUE);
193 }
194 } finally {
195 closed = true;
196 }
197 }
198 }
199 }
200
201 /**
202 * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n].
203 * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0.
204 * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format.
205 */
206 static public class ChunkEncoder extends OutputStream {
207 /**
208 * The data output stream it connects to.
209 */
210 private DataOutputStream out;
211
212 /**
213 * The internal buffer that is only used when we do not know the advertised size.
214 */
215 private byte buf[];
216
217 /**
218 * The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> through
219 * <tt>buf[count-1]</tt> contain valid byte data.
220 */
221 private int count;
222
223 /**
224 * Constructor.
225 *
226 * @param out
227 * the underlying output stream.
228 * @param buf
229 * user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle.
230 */
231 public ChunkEncoder(DataOutputStream out, byte[] buf) {
232 this.out = out;
233 this.buf = buf;
234 this.count = 0;
235 }
236
237 /**
238 * Write out a chunk.
239 *
240 * @param chunk
241 * The chunk buffer.
242 * @param offset
243 * Offset to chunk buffer for the beginning of chunk.
244 * @param len
245 * @param last
246 * Is this the last call to flushBuffer?
247 */
248 private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException {
249 if (last) {
250 Utils.writeVInt(out, len);
251 if (len > 0) {
252 out.write(chunk, offset, len);
253 }
254 } else {
255 if (len > 0) {
256 Utils.writeVInt(out, -len);
257 out.write(chunk, offset, len);
258 }
259 }
260 }
261
262 /**
263 * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block.
264 *
265 * @param data
266 * User supplied data buffer.
267 * @param offset
268 * Offset to user data buffer.
269 * @param len
270 * User data buffer size.
271 */
272 private void writeBufData(byte[] data, int offset, int len) throws IOException {
273 if (count + len > 0) {
274 Utils.writeVInt(out, -(count + len));
275 out.write(buf, 0, count);
276 count = 0;
277 out.write(data, offset, len);
278 }
279 }
280
281 /**
282 * Flush the internal buffer.
283 *
284 * Is this the last call to flushBuffer?
285 *
286 * @throws java.io.IOException
287 */
288 private void flushBuffer() throws IOException {
289 if (count > 0) {
290 writeChunk(buf, 0, count, false);
291 count = 0;
292 }
293 }
294
295 @Override
296 public void write(int b) throws IOException {
297 if (count >= buf.length) {
298 flushBuffer();
299 }
300 buf[count++] = (byte) b;
301 }
302
303 @Override
304 public void write(byte b[]) throws IOException {
305 write(b, 0, b.length);
306 }
307
308 @Override
309 public void write(byte b[], int off, int len) throws IOException {
310 if ((len + count) >= buf.length) {
311
312
313
314
315 writeBufData(b, off, len);
316 return;
317 }
318
319 System.arraycopy(b, off, buf, count, len);
320 count += len;
321 }
322
323 @Override
324 public void flush() throws IOException {
325 flushBuffer();
326 out.flush();
327 }
328
329 @Override
330 public void close() throws IOException {
331 if (buf != null) {
332 try {
333 writeChunk(buf, 0, count, true);
334 } finally {
335 buf = null;
336 out = null;
337 }
338 }
339 }
340 }
341
342 /**
343 * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front.
344 */
345 static public class SingleChunkEncoder extends OutputStream {
346 /**
347 * The data output stream it connects to.
348 */
349 private final DataOutputStream out;
350
351 /**
352 * The remaining bytes to be written.
353 */
354 private int remain;
355 private boolean closed = false;
356
357 /**
358 * Constructor.
359 *
360 * @param out
361 * the underlying output stream.
362 * @param size
363 * The total # of bytes to be written as a single chunk.
364 * @throws java.io.IOException
365 * if an I/O error occurs.
366 */
367 public SingleChunkEncoder(DataOutputStream out, int size) throws IOException {
368 this.out = out;
369 this.remain = size;
370 Utils.writeVInt(out, size);
371 }
372
373 @Override
374 public void write(int b) throws IOException {
375 if (remain > 0) {
376 out.write(b);
377 --remain;
378 } else {
379 throw new IOException("Writing more bytes than advertised size.");
380 }
381 }
382
383 @Override
384 public void write(byte b[]) throws IOException {
385 write(b, 0, b.length);
386 }
387
388 @Override
389 public void write(byte b[], int off, int len) throws IOException {
390 if (remain >= len) {
391 out.write(b, off, len);
392 remain -= len;
393 } else {
394 throw new IOException("Writing more bytes than advertised size.");
395 }
396 }
397
398 @Override
399 public void flush() throws IOException {
400 out.flush();
401 }
402
403 @Override
404 public void close() throws IOException {
405 if (closed == true) {
406 return;
407 }
408
409 try {
410 if (remain > 0) {
411 throw new IOException("Writing less bytes than advertised size.");
412 }
413 } finally {
414 closed = true;
415 }
416 }
417 }
418 }