View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.accumulo.core.file.rfile.bcfile;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  /**
26   * Several related classes to support chunk-encoded sub-streams on top of a regular stream.
27   */
28  final class Chunk {
29    
30    /**
31     * Prevent the instantiation of class.
32     */
33    private Chunk() {
34      // nothing
35    }
36    
37    /**
38     * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder.
39     */
40    static public class ChunkDecoder extends InputStream {
41      private DataInputStream in = null;
42      private boolean lastChunk;
43      private int remain = 0;
44      private boolean closed;
45      
46      public ChunkDecoder() {
47        lastChunk = true;
48        closed = true;
49      }
50      
51      public void reset(DataInputStream downStream) {
52        // no need to wind forward the old input.
53        in = downStream;
54        lastChunk = false;
55        remain = 0;
56        closed = false;
57      }
58      
59      /**
60       * Constructor
61       * 
62       * @param in
63       *          The source input stream which contains chunk-encoded data stream.
64       */
65      public ChunkDecoder(DataInputStream in) {
66        this.in = in;
67        lastChunk = false;
68        closed = false;
69      }
70      
71      /**
72       * Have we reached the last chunk.
73       * 
74       * @return true if we have reached the last chunk.
75       * @throws java.io.IOException
76       */
77      public boolean isLastChunk() throws IOException {
78        checkEOF();
79        return lastChunk;
80      }
81      
82      /**
83       * How many bytes remain in the current chunk?
84       * 
85       * @return remaining bytes left in the current chunk.
86       * @throws java.io.IOException
87       */
88      public int getRemain() throws IOException {
89        checkEOF();
90        return remain;
91      }
92      
93      /**
94       * Reading the length of next chunk.
95       * 
96       * @throws java.io.IOException
97       *           when no more data is available.
98       */
99      private void readLength() throws IOException {
100       remain = Utils.readVInt(in);
101       if (remain >= 0) {
102         lastChunk = true;
103       } else {
104         remain = -remain;
105       }
106     }
107     
108     /**
109      * Check whether we reach the end of the stream.
110      * 
111      * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise.
112      * @throws java.io.IOException
113      *           on I/O errors.
114      */
115     private boolean checkEOF() throws IOException {
116       if (isClosed())
117         return true;
118       while (true) {
119         if (remain > 0)
120           return false;
121         if (lastChunk)
122           return true;
123         readLength();
124       }
125     }
126     
127     @Override
128     /*
129      * This method never blocks the caller. Returning 0 does not mean we reach the end of the stream.
130      */
131     public int available() {
132       return remain;
133     }
134     
135     @Override
136     public int read() throws IOException {
137       if (checkEOF())
138         return -1;
139       int ret = in.read();
140       if (ret < 0)
141         throw new IOException("Corrupted chunk encoding stream");
142       --remain;
143       return ret;
144     }
145     
146     @Override
147     public int read(byte[] b) throws IOException {
148       return read(b, 0, b.length);
149     }
150     
151     @Override
152     public int read(byte[] b, int off, int len) throws IOException {
153       if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
154         throw new IndexOutOfBoundsException();
155       }
156       
157       if (!checkEOF()) {
158         int n = Math.min(remain, len);
159         int ret = in.read(b, off, n);
160         if (ret < 0)
161           throw new IOException("Corrupted chunk encoding stream");
162         remain -= ret;
163         return ret;
164       }
165       return -1;
166     }
167     
168     @Override
169     public long skip(long n) throws IOException {
170       if (!checkEOF()) {
171         long ret = in.skip(Math.min(remain, n));
172         remain -= ret;
173         return ret;
174       }
175       return 0;
176     }
177     
178     @Override
179     public boolean markSupported() {
180       return false;
181     }
182     
183     public boolean isClosed() {
184       return closed;
185     }
186     
187     @Override
188     public void close() throws IOException {
189       if (closed == false) {
190         try {
191           while (!checkEOF()) {
192             skip(Integer.MAX_VALUE);
193           }
194         } finally {
195           closed = true;
196         }
197       }
198     }
199   }
200   
201   /**
202    * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n].
203    * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0.
204    * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format.
205    */
206   static public class ChunkEncoder extends OutputStream {
207     /**
208      * The data output stream it connects to.
209      */
210     private DataOutputStream out;
211     
212     /**
213      * The internal buffer that is only used when we do not know the advertised size.
214      */
215     private byte buf[];
216     
217     /**
218      * The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> through
219      * <tt>buf[count-1]</tt> contain valid byte data.
220      */
221     private int count;
222     
223     /**
224      * Constructor.
225      * 
226      * @param out
227      *          the underlying output stream.
228      * @param buf
229      *          user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle.
230      */
231     public ChunkEncoder(DataOutputStream out, byte[] buf) {
232       this.out = out;
233       this.buf = buf;
234       this.count = 0;
235     }
236     
237     /**
238      * Write out a chunk.
239      * 
240      * @param chunk
241      *          The chunk buffer.
242      * @param offset
243      *          Offset to chunk buffer for the beginning of chunk.
244      * @param len
245      * @param last
246      *          Is this the last call to flushBuffer?
247      */
248     private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException {
249       if (last) { // always write out the length for the last chunk.
250         Utils.writeVInt(out, len);
251         if (len > 0) {
252           out.write(chunk, offset, len);
253         }
254       } else {
255         if (len > 0) {
256           Utils.writeVInt(out, -len);
257           out.write(chunk, offset, len);
258         }
259       }
260     }
261     
262     /**
263      * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block.
264      * 
265      * @param data
266      *          User supplied data buffer.
267      * @param offset
268      *          Offset to user data buffer.
269      * @param len
270      *          User data buffer size.
271      */
272     private void writeBufData(byte[] data, int offset, int len) throws IOException {
273       if (count + len > 0) {
274         Utils.writeVInt(out, -(count + len));
275         out.write(buf, 0, count);
276         count = 0;
277         out.write(data, offset, len);
278       }
279     }
280     
281     /**
282      * Flush the internal buffer.
283      * 
284      * Is this the last call to flushBuffer?
285      * 
286      * @throws java.io.IOException
287      */
288     private void flushBuffer() throws IOException {
289       if (count > 0) {
290         writeChunk(buf, 0, count, false);
291         count = 0;
292       }
293     }
294     
295     @Override
296     public void write(int b) throws IOException {
297       if (count >= buf.length) {
298         flushBuffer();
299       }
300       buf[count++] = (byte) b;
301     }
302     
303     @Override
304     public void write(byte b[]) throws IOException {
305       write(b, 0, b.length);
306     }
307     
308     @Override
309     public void write(byte b[], int off, int len) throws IOException {
310       if ((len + count) >= buf.length) {
311         /*
312          * If the input data do not fit in buffer, flush the output buffer and then write the data directly. In this way buffered streams will cascade
313          * harmlessly.
314          */
315         writeBufData(b, off, len);
316         return;
317       }
318       
319       System.arraycopy(b, off, buf, count, len);
320       count += len;
321     }
322     
323     @Override
324     public void flush() throws IOException {
325       flushBuffer();
326       out.flush();
327     }
328     
329     @Override
330     public void close() throws IOException {
331       if (buf != null) {
332         try {
333           writeChunk(buf, 0, count, true);
334         } finally {
335           buf = null;
336           out = null;
337         }
338       }
339     }
340   }
341   
342   /**
343    * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front.
344    */
345   static public class SingleChunkEncoder extends OutputStream {
346     /**
347      * The data output stream it connects to.
348      */
349     private final DataOutputStream out;
350     
351     /**
352      * The remaining bytes to be written.
353      */
354     private int remain;
355     private boolean closed = false;
356     
357     /**
358      * Constructor.
359      * 
360      * @param out
361      *          the underlying output stream.
362      * @param size
363      *          The total # of bytes to be written as a single chunk.
364      * @throws java.io.IOException
365      *           if an I/O error occurs.
366      */
367     public SingleChunkEncoder(DataOutputStream out, int size) throws IOException {
368       this.out = out;
369       this.remain = size;
370       Utils.writeVInt(out, size);
371     }
372     
373     @Override
374     public void write(int b) throws IOException {
375       if (remain > 0) {
376         out.write(b);
377         --remain;
378       } else {
379         throw new IOException("Writing more bytes than advertised size.");
380       }
381     }
382     
383     @Override
384     public void write(byte b[]) throws IOException {
385       write(b, 0, b.length);
386     }
387     
388     @Override
389     public void write(byte b[], int off, int len) throws IOException {
390       if (remain >= len) {
391         out.write(b, off, len);
392         remain -= len;
393       } else {
394         throw new IOException("Writing more bytes than advertised size.");
395       }
396     }
397     
398     @Override
399     public void flush() throws IOException {
400       out.flush();
401     }
402     
403     @Override
404     public void close() throws IOException {
405       if (closed == true) {
406         return;
407       }
408       
409       try {
410         if (remain > 0) {
411           throw new IOException("Writing less bytes than advertised size.");
412         }
413       } finally {
414         closed = true;
415       }
416     }
417   }
418 }