View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
38  import org.apache.hadoop.hbase.io.ByteBuffInputStream;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.nio.ByteBuff;
45  import org.apache.hadoop.hbase.nio.MultiByteBuff;
46  import org.apache.hadoop.hbase.nio.SingleByteBuff;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ChecksumType;
49  import org.apache.hadoop.hbase.util.ClassSize;
50  import org.apache.hadoop.io.IOUtils;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Preconditions;
54  
55  /**
56   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
57   * <ul>
58   * <li>In version 1 all blocks are always compressed or uncompressed, as
59   * specified by the {@link HFile}'s compression algorithm, with a type-specific
60   * magic record stored in the beginning of the compressed data (i.e. one needs
61   * to uncompress the compressed block to determine the block type). There is
62   * only a single compression algorithm setting for all blocks. Offset and size
63   * information from the block index are required to read a block.
64   * <li>In version 2 a block is structured as follows:
65   * <ul>
66   * <li>header (see Writer#finishBlock())
67   * <ul>
68   * <li>Magic record identifying the block type (8 bytes)
69   * <li>Compressed block size, excluding header, including checksum (4 bytes)
70   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
71   * <li>The offset of the previous block of the same type (8 bytes). This is
72   * used to be able to navigate to the previous block without going to the block
73   * <li>For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
74   * <li>For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
75   * <li>For minorVersions &gt;=1, the size of data on disk, including header,
76   * excluding checksums (4 bytes)
77   * </ul>
78   * </li>
79   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
80   * same for all the blocks in the {@link HFile}, similarly to what was done in
81   * version 1.
82   * <li>For minorVersions &gt;=1, a series of 4 byte checksums, one each for
83   * the number of bytes specified by bytesPerChecksum.
84   * </ul>
85   * </ul>
86   */
87  @InterfaceAudience.Private
88  public class HFileBlock implements Cacheable {
89  
90    /**
91     * On a checksum failure on a Reader, these many suceeding read
92     * requests switch back to using hdfs checksums before auto-reenabling
93     * hbase checksum verification.
94     */
95    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96  
97    public static final boolean FILL_HEADER = true;
98    public static final boolean DONT_FILL_HEADER = false;
99  
100   /**
101    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
102    * This extends normal header by adding the id of encoder.
103    */
104   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105       + DataBlockEncoding.ID_SIZE;
106 
107   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109 
110   // How to get the estimate correctly? if it is a singleBB?
111   public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
112       new MultiByteBuff(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false);
113 
114   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
115   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
116       + Bytes.SIZEOF_LONG;
117 
118   /**
119    * Each checksum value is an integer that can be stored in 4 bytes.
120    */
121   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
122 
123   static final CacheableDeserializer<Cacheable> blockDeserializer =
124       new CacheableDeserializer<Cacheable>() {
125         public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
126             throws IOException {
127           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
128           ByteBuff newByteBuffer;
129           if (reuse) {
130             newByteBuffer = buf.slice();
131           } else {
132             // Used only in tests
133             int len = buf.limit();
134             newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len));
135             newByteBuffer.put(0, buf, buf.position(), len);
136           }
137           buf.position(buf.limit());
138           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
139           boolean usesChecksum = buf.get() == (byte)1;
140           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType);
141           hFileBlock.offset = buf.getLong();
142           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
143           if (hFileBlock.hasNextBlockHeader()) {
144             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
145           }
146           return hFileBlock;
147         }
148 
149         @Override
150         public int getDeserialiserIdentifier() {
151           return deserializerIdentifier;
152         }
153 
154         @Override
155         public HFileBlock deserialize(ByteBuff b) throws IOException {
156           // Used only in tests
157           return deserialize(b, false, MemoryType.EXCLUSIVE);
158         }
159       };
160   private static final int deserializerIdentifier;
161   static {
162     deserializerIdentifier = CacheableDeserializerIdManager
163         .registerDeserializer(blockDeserializer);
164   }
165 
166   /** Type of block. Header field 0. */
167   private BlockType blockType;
168 
169   /** Size on disk excluding header, including checksum. Header field 1. */
170   private int onDiskSizeWithoutHeader;
171 
172   /** Size of pure data. Does not include header or checksums. Header field 2. */
173   private final int uncompressedSizeWithoutHeader;
174 
175   /** The offset of the previous block on disk. Header field 3. */
176   private final long prevBlockOffset;
177 
178   /**
179    * Size on disk of header + data. Excludes checksum. Header field 6,
180    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
181    */
182   private final int onDiskDataSizeWithHeader;
183 
184   /** The in-memory representation of the hfile block */
185   private ByteBuff buf;
186 
187   /** Meta data that holds meta information on the hfileblock */
188   private HFileContext fileContext;
189 
190   /**
191    * The offset of this block in the file. Populated by the reader for
192    * convenience of access. This offset is not part of the block header.
193    */
194   private long offset = -1;
195 
196   /**
197    * The on-disk size of the next block, including the header, obtained by
198    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
199    * header, or -1 if unknown.
200    */
201   private int nextBlockOnDiskSizeWithHeader = -1;
202 
203   private MemoryType memType = MemoryType.EXCLUSIVE;
204 
205   /**
206    * Creates a new {@link HFile} block from the given fields. This constructor
207    * is mostly used when the block data has already been read and uncompressed,
208    * and is sitting in a byte buffer.
209    *
210    * @param blockType the type of this block, see {@link BlockType}
211    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
212    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
213    * @param prevBlockOffset see {@link #prevBlockOffset}
214    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
215    *          uncompressed data. This
216    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
217    * @param offset the file offset the block was read from
218    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
219    * @param fileContext HFile meta data
220    */
221   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
222       long prevBlockOffset, ByteBuff buf, boolean fillHeader, long offset,
223       int onDiskDataSizeWithHeader, HFileContext fileContext) {
224     this.blockType = blockType;
225     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
226     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
227     this.prevBlockOffset = prevBlockOffset;
228     this.buf = buf;
229     this.offset = offset;
230     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
231     this.fileContext = fileContext;
232     if (fillHeader)
233       overwriteHeader();
234     this.buf.rewind();
235   }
236 
237   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
238       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
239       int onDiskDataSizeWithHeader, HFileContext fileContext) {
240     this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
241         new SingleByteBuff(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext);
242   }
243 
244   /**
245    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
246    */
247   HFileBlock(HFileBlock that) {
248     this.blockType = that.blockType;
249     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
250     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
251     this.prevBlockOffset = that.prevBlockOffset;
252     this.buf = that.buf.duplicate();
253     this.offset = that.offset;
254     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
255     this.fileContext = that.fileContext;
256     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
257   }
258 
259   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
260     this(new SingleByteBuff(b), usesHBaseChecksum);
261   }
262 
263   /**
264    * Creates a block from an existing buffer starting with a header. Rewinds
265    * and takes ownership of the buffer. By definition of rewind, ignores the
266    * buffer position, but if you slice the buffer beforehand, it will rewind
267    * to that point.
268    */
269   HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
270     this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE);
271   }
272 
273   /**
274    * Creates a block from an existing buffer starting with a header. Rewinds
275    * and takes ownership of the buffer. By definition of rewind, ignores the
276    * buffer position, but if you slice the buffer beforehand, it will rewind
277    * to that point.
278    */
279   HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException {
280     b.rewind();
281     blockType = BlockType.read(b);
282     onDiskSizeWithoutHeader = b.getInt();
283     uncompressedSizeWithoutHeader = b.getInt();
284     prevBlockOffset = b.getLong();
285     HFileContextBuilder contextBuilder = new HFileContextBuilder();
286     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
287     if (usesHBaseChecksum) {
288       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
289       contextBuilder.withBytesPerCheckSum(b.getInt());
290       this.onDiskDataSizeWithHeader = b.getInt();
291     } else {
292       contextBuilder.withChecksumType(ChecksumType.NULL);
293       contextBuilder.withBytesPerCheckSum(0);
294       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
295                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
296     }
297     this.fileContext = contextBuilder.build();
298     this.memType = memType;
299     buf = b;
300     buf.rewind();
301   }
302 
303   public BlockType getBlockType() {
304     return blockType;
305   }
306 
307   /** @return get data block encoding id that was used to encode this block */
308   public short getDataBlockEncodingId() {
309     if (blockType != BlockType.ENCODED_DATA) {
310       throw new IllegalArgumentException("Querying encoder ID of a block " +
311           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
312     }
313     return buf.getShort(headerSize());
314   }
315 
316   /**
317    * @return the on-disk size of header + data part + checksum.
318    */
319   public int getOnDiskSizeWithHeader() {
320     return onDiskSizeWithoutHeader + headerSize();
321   }
322 
323   /**
324    * @return the on-disk size of the data part + checksum (header excluded).
325    */
326   public int getOnDiskSizeWithoutHeader() {
327     return onDiskSizeWithoutHeader;
328   }
329 
330   /**
331    * @return the uncompressed size of data part (header and checksum excluded).
332    */
333    public int getUncompressedSizeWithoutHeader() {
334     return uncompressedSizeWithoutHeader;
335   }
336 
337   /**
338    * @return the offset of the previous block of the same type in the file, or
339    *         -1 if unknown
340    */
341   public long getPrevBlockOffset() {
342     return prevBlockOffset;
343   }
344 
345   /**
346    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
347    * is modified as side-effect.
348    */
349   private void overwriteHeader() {
350     buf.rewind();
351     blockType.write(buf);
352     buf.putInt(onDiskSizeWithoutHeader);
353     buf.putInt(uncompressedSizeWithoutHeader);
354     buf.putLong(prevBlockOffset);
355     if (this.fileContext.isUseHBaseChecksum()) {
356       buf.put(fileContext.getChecksumType().getCode());
357       buf.putInt(fileContext.getBytesPerChecksum());
358       buf.putInt(onDiskDataSizeWithHeader);
359     }
360   }
361 
362   /**
363    * Returns a buffer that does not include the header or checksum.
364    *
365    * @return the buffer with header skipped and checksum omitted.
366    */
367   public ByteBuff getBufferWithoutHeader() {
368     ByteBuff dup = this.buf.duplicate();
369     dup.position(headerSize());
370     dup.limit(buf.limit() - totalChecksumBytes());
371     return dup.slice();
372   }
373 
374   /**
375    * Returns the buffer this block stores internally. The clients must not
376    * modify the buffer object. This method has to be public because it is used
377    * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
378    * filter lookup, but has to be used with caution. Checksum data is not
379    * included in the returned buffer but header data is.
380    *
381    * @return the buffer of this block for read-only operations
382    */
383   public ByteBuff getBufferReadOnly() {
384     ByteBuff dup = this.buf.duplicate();
385     dup.limit(buf.limit() - totalChecksumBytes());
386     return dup.slice();
387   }
388 
389   /**
390    * Returns the buffer of this block, including header data. The clients must
391    * not modify the buffer object. This method has to be public because it is
392    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
393    *
394    * @return the buffer with header and checksum included for read-only operations
395    */
396   public ByteBuff getBufferReadOnlyWithHeader() {
397     ByteBuff dup = this.buf.duplicate();
398     return dup.slice();
399   }
400 
401   /**
402    * Returns a byte buffer of this block, including header data and checksum, positioned at
403    * the beginning of header. The underlying data array is not copied.
404    *
405    * @return the byte buffer with header and checksum included
406    */
407   ByteBuff getBufferWithHeader() {
408     ByteBuff dupBuf = buf.duplicate();
409     dupBuf.rewind();
410     return dupBuf;
411   }
412 
413   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
414       String fieldName) throws IOException {
415     if (valueFromBuf != valueFromField) {
416       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
417           + ") is different from that in the field (" + valueFromField + ")");
418     }
419   }
420 
421   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
422       throws IOException {
423     if (valueFromBuf != valueFromField) {
424       throw new IOException("Block type stored in the buffer: " +
425         valueFromBuf + ", block type field: " + valueFromField);
426     }
427   }
428 
429   /**
430    * Checks if the block is internally consistent, i.e. the first
431    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
432    * valid header consistent with the fields. Assumes a packed block structure.
433    * This function is primary for testing and debugging, and is not
434    * thread-safe, because it alters the internal buffer pointer.
435    */
436   void sanityCheck() throws IOException {
437     buf.rewind();
438 
439     sanityCheckAssertion(BlockType.read(buf), blockType);
440 
441     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
442         "onDiskSizeWithoutHeader");
443 
444     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
445         "uncompressedSizeWithoutHeader");
446 
447     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
448     if (this.fileContext.isUseHBaseChecksum()) {
449       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
450       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(),
451           "bytesPerChecksum");
452       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
453     }
454 
455     int cksumBytes = totalChecksumBytes();
456     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
457     if (buf.limit() != expectedBufLimit) {
458       throw new AssertionError("Expected buffer limit " + expectedBufLimit
459           + ", got " + buf.limit());
460     }
461 
462     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
463     // block's header, so there are two sensible values for buffer capacity.
464     int hdrSize = headerSize();
465     if (buf.capacity() != expectedBufLimit &&
466         buf.capacity() != expectedBufLimit + hdrSize) {
467       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
468           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
469     }
470   }
471 
472   @Override
473   public String toString() {
474     StringBuilder sb = new StringBuilder()
475       .append("HFileBlock [")
476       .append(" fileOffset=").append(offset)
477       .append(" headerSize()=").append(headerSize())
478       .append(" blockType=").append(blockType)
479       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
480       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
481       .append(" prevBlockOffset=").append(prevBlockOffset)
482       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
483     if (fileContext.isUseHBaseChecksum()) {
484       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
485         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
486         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
487     } else {
488       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
489         .append("(").append(onDiskSizeWithoutHeader)
490         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
491     }
492     String dataBegin = null;
493     if (buf.hasArray()) {
494       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
495           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
496     } else {
497       ByteBuff bufWithoutHeader = getBufferWithoutHeader();
498       byte[] dataBeginBytes = new byte[Math.min(32,
499           bufWithoutHeader.limit() - bufWithoutHeader.position())];
500       bufWithoutHeader.get(dataBeginBytes);
501       dataBegin = Bytes.toStringBinary(dataBeginBytes);
502     }
503     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
504       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
505       .append(" isUnpacked()=").append(isUnpacked())
506       .append(" buf=[ ").append(buf).append(" ]")
507       .append(" dataBeginsWith=").append(dataBegin)
508       .append(" fileContext=").append(fileContext)
509       .append(" ]");
510     return sb.toString();
511   }
512 
513   /**
514    * Called after reading a block with provided onDiskSizeWithHeader.
515    */
516   private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
517   throws IOException {
518     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
519       String dataBegin = null;
520       if (buf.hasArray()) {
521         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
522       } else {
523         ByteBuff bufDup = getBufferReadOnly();
524         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
525         bufDup.get(dataBeginBytes);
526         dataBegin = Bytes.toStringBinary(dataBeginBytes);
527       }
528       String blockInfoMsg =
529         "Block offset: " + offset + ", data starts with: " + dataBegin;
530       throw new IOException("On-disk size without header provided is "
531           + expectedOnDiskSizeWithoutHeader + ", but block "
532           + "header contains " + onDiskSizeWithoutHeader + ". " +
533           blockInfoMsg);
534     }
535   }
536 
537   /**
538    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
539    * encoded structure. Internal structures are shared between instances where applicable.
540    */
541   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
542     if (!fileContext.isCompressedOrEncrypted()) {
543       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
544       // which is used for block serialization to L2 cache, does not preserve encoding and
545       // encryption details.
546       return this;
547     }
548 
549     HFileBlock unpacked = new HFileBlock(this);
550     unpacked.allocateBuffer(); // allocates space for the decompressed block
551 
552     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
553       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
554 
555     ByteBuff dup = this.buf.duplicate();
556     dup.position(this.headerSize());
557     dup = dup.slice();
558     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
559       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
560       dup);
561 
562     // Preserve the next block's header bytes in the new block if we have them.
563     if (unpacked.hasNextBlockHeader()) {
564       // Both the buffers are limited till checksum bytes and avoid the next block's header.
565       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
566       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
567       // new BB objects
568       ByteBuff inDup = this.buf.duplicate();
569       inDup.limit(inDup.limit() + headerSize());
570       ByteBuff outDup = unpacked.buf.duplicate();
571       outDup.limit(outDup.limit() + unpacked.headerSize());
572       outDup.put(
573           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
574               + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader,
575           unpacked.headerSize());
576     }
577     return unpacked;
578   }
579 
580   /**
581    * Return true when this buffer includes next block's header.
582    */
583   private boolean hasNextBlockHeader() {
584     return nextBlockOnDiskSizeWithHeader > 0;
585   }
586 
587   /**
588    * Always allocates a new buffer of the correct size. Copies header bytes
589    * from the existing buffer. Does not change header fields.
590    * Reserve room to keep checksum bytes too.
591    */
592   private void allocateBuffer() {
593     int cksumBytes = totalChecksumBytes();
594     int headerSize = headerSize();
595     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
596         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
597 
598     // TODO we need consider allocating offheap here?
599     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
600 
601     // Copy header bytes into newBuf.
602     // newBuf is HBB so no issue in calling array()
603     buf.position(0);
604     buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
605 
606     buf = new SingleByteBuff(newBuf);
607     // set limit to exclude next block's header
608     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
609   }
610 
611   /**
612    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
613    * calculated heuristic, not tracked attribute of the block.
614    */
615   public boolean isUnpacked() {
616     final int cksumBytes = totalChecksumBytes();
617     final int headerSize = headerSize();
618     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
619     final int bufCapacity = buf.capacity();
620     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
621   }
622 
623   /** An additional sanity-check in case no compression or encryption is being used. */
624   public void assumeUncompressed() throws IOException {
625     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
626         totalChecksumBytes()) {
627       throw new IOException("Using no compression but "
628           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
629           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
630           + ", numChecksumbytes=" + totalChecksumBytes());
631     }
632   }
633 
634   /**
635    * @param expectedType the expected type of this block
636    * @throws IOException if this block's type is different than expected
637    */
638   public void expectType(BlockType expectedType) throws IOException {
639     if (blockType != expectedType) {
640       throw new IOException("Invalid block type: expected=" + expectedType
641           + ", actual=" + blockType);
642     }
643   }
644 
645   /** @return the offset of this block in the file it was read from */
646   public long getOffset() {
647     if (offset < 0) {
648       throw new IllegalStateException(
649           "HFile block offset not initialized properly");
650     }
651     return offset;
652   }
653 
654   /**
655    * @return a byte stream reading the data + checksum of this block
656    */
657   public DataInputStream getByteStream() {
658     ByteBuff dup = this.buf.duplicate();
659     dup.position(this.headerSize());
660     return new DataInputStream(new ByteBuffInputStream(dup));
661   }
662 
663   @Override
664   public long heapSize() {
665     long size = ClassSize.align(
666         ClassSize.OBJECT +
667         // Block type, multi byte buffer, MemoryType and meta references
668         4 * ClassSize.REFERENCE +
669         // On-disk size, uncompressed size, and next block's on-disk size
670         // bytePerChecksum and onDiskDataSize
671         4 * Bytes.SIZEOF_INT +
672         // This and previous block offset
673         2 * Bytes.SIZEOF_LONG +
674         // Heap size of the meta object. meta will be always not null.
675         fileContext.heapSize()
676     );
677 
678     if (buf != null) {
679       // Deep overhead of the byte buffer. Needs to be aligned separately.
680       size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
681     }
682 
683     return ClassSize.align(size);
684   }
685 
686   /**
687    * Read from an input stream. Analogous to
688    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
689    * number of "extra" bytes that would be desirable but not absolutely
690    * necessary to read.
691    *
692    * @param in the input stream to read from
693    * @param buf the buffer to read into
694    * @param bufOffset the destination offset in the buffer
695    * @param necessaryLen the number of bytes that are absolutely necessary to
696    *          read
697    * @param extraLen the number of extra bytes that would be nice to read
698    * @return true if succeeded reading the extra bytes
699    * @throws IOException if failed to read the necessary bytes
700    */
701   public static boolean readWithExtra(InputStream in, byte[] buf,
702       int bufOffset, int necessaryLen, int extraLen) throws IOException {
703     int bytesRemaining = necessaryLen + extraLen;
704     while (bytesRemaining > 0) {
705       int ret = in.read(buf, bufOffset, bytesRemaining);
706       if (ret == -1 && bytesRemaining <= extraLen) {
707         // We could not read the "extra data", but that is OK.
708         break;
709       }
710 
711       if (ret < 0) {
712         throw new IOException("Premature EOF from inputStream (read "
713             + "returned " + ret + ", was trying to read " + necessaryLen
714             + " necessary bytes and " + extraLen + " extra bytes, "
715             + "successfully read "
716             + (necessaryLen + extraLen - bytesRemaining));
717       }
718       bufOffset += ret;
719       bytesRemaining -= ret;
720     }
721     return bytesRemaining <= 0;
722   }
723 
724   /**
725    * Read from an input stream. Analogous to
726    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
727    * positional read and specifies a number of "extra" bytes that would be
728    * desirable but not absolutely necessary to read.
729    *
730    * @param in the input stream to read from
731    * @param position the position within the stream from which to start reading
732    * @param buf the buffer to read into
733    * @param bufOffset the destination offset in the buffer
734    * @param necessaryLen the number of bytes that are absolutely necessary to
735    *     read
736    * @param extraLen the number of extra bytes that would be nice to read
737    * @return true if and only if extraLen is > 0 and reading those extra bytes
738    *     was successful
739    * @throws IOException if failed to read the necessary bytes
740    */
741   @VisibleForTesting
742   static boolean positionalReadWithExtra(FSDataInputStream in,
743       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
744       throws IOException {
745     int bytesRemaining = necessaryLen + extraLen;
746     int bytesRead = 0;
747     while (bytesRead < necessaryLen) {
748       int ret = in.read(position, buf, bufOffset, bytesRemaining);
749       if (ret < 0) {
750         throw new IOException("Premature EOF from inputStream (positional read "
751             + "returned " + ret + ", was trying to read " + necessaryLen
752             + " necessary bytes and " + extraLen + " extra bytes, "
753             + "successfully read " + bytesRead);
754       }
755       position += ret;
756       bufOffset += ret;
757       bytesRemaining -= ret;
758       bytesRead += ret;
759     }
760     return bytesRead != necessaryLen && bytesRemaining <= 0;
761   }
762 
763   /**
764    * @return the on-disk size of the next block (including the header size)
765    *         that was read by peeking into the next block's header
766    */
767   public int getNextBlockOnDiskSizeWithHeader() {
768     return nextBlockOnDiskSizeWithHeader;
769   }
770 
771   /**
772    * Unified version 2 {@link HFile} block writer. The intended usage pattern
773    * is as follows:
774    * <ol>
775    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
776    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
777    * <li>Write your data into the stream.
778    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
779    * store the serialized block into an external stream.
780    * <li>Repeat to write more blocks.
781    * </ol>
782    * <p>
783    */
784   public static class Writer {
785 
786     private enum State {
787       INIT,
788       WRITING,
789       BLOCK_READY
790     };
791 
792     /** Writer state. Used to ensure the correct usage protocol. */
793     private State state = State.INIT;
794 
795     /** Data block encoder used for data blocks */
796     private final HFileDataBlockEncoder dataBlockEncoder;
797 
798     private HFileBlockEncodingContext dataBlockEncodingCtx;
799 
800     /** block encoding context for non-data blocks */
801     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
802 
803     /**
804      * The stream we use to accumulate data in uncompressed format for each
805      * block. We reset this stream at the end of each block and reuse it. The
806      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
807      * stream.
808      */
809     private ByteArrayOutputStream baosInMemory;
810 
811     /**
812      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
813      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
814      * to {@link BlockType#ENCODED_DATA}.
815      */
816     private BlockType blockType;
817 
818     /**
819      * A stream that we write uncompressed bytes to, which compresses them and
820      * writes them to {@link #baosInMemory}.
821      */
822     private DataOutputStream userDataStream;
823 
824     // Size of actual data being written. Not considering the block encoding/compression. This
825     // includes the header size also.
826     private int unencodedDataSizeWritten;
827 
828     /**
829      * Bytes to be written to the file system, including the header. Compressed
830      * if compression is turned on. It also includes the checksum data that
831      * immediately follows the block data. (header + data + checksums)
832      */
833     private byte[] onDiskBytesWithHeader;
834 
835     /**
836      * The size of the checksum data on disk. It is used only if data is
837      * not compressed. If data is compressed, then the checksums are already
838      * part of onDiskBytesWithHeader. If data is uncompressed, then this
839      * variable stores the checksum data for this block.
840      */
841     private byte[] onDiskChecksum;
842 
843     /**
844      * Valid in the READY state. Contains the header and the uncompressed (but
845      * potentially encoded, if this is a data block) bytes, so the length is
846      * {@link #uncompressedSizeWithoutHeader} +
847      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
848      * Does not store checksums.
849      */
850     private byte[] uncompressedBytesWithHeader;
851 
852     /**
853      * Current block's start offset in the {@link HFile}. Set in
854      * {@link #writeHeaderAndData(FSDataOutputStream)}.
855      */
856     private long startOffset;
857 
858     /**
859      * Offset of previous block by block type. Updated when the next block is
860      * started.
861      */
862     private long[] prevOffsetByType;
863 
864     /** The offset of the previous block of the same type */
865     private long prevOffset;
866     /** Meta data that holds information about the hfileblock**/
867     private HFileContext fileContext;
868 
869     /**
870      * @param dataBlockEncoder data block encoding algorithm to use
871      */
872     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
873       this.dataBlockEncoder = dataBlockEncoder != null
874           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
875       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
876           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
877       dataBlockEncodingCtx = this.dataBlockEncoder
878           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
879 
880       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
881         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
882             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
883             fileContext.getBytesPerChecksum());
884       }
885 
886       baosInMemory = new ByteArrayOutputStream();
887 
888       prevOffsetByType = new long[BlockType.values().length];
889       for (int i = 0; i < prevOffsetByType.length; ++i)
890         prevOffsetByType[i] = -1;
891 
892       this.fileContext = fileContext;
893     }
894 
895     /**
896      * Starts writing into the block. The previous block's data is discarded.
897      *
898      * @return the stream the user can write their data into
899      * @throws IOException
900      */
901     public DataOutputStream startWriting(BlockType newBlockType)
902         throws IOException {
903       if (state == State.BLOCK_READY && startOffset != -1) {
904         // We had a previous block that was written to a stream at a specific
905         // offset. Save that offset as the last offset of a block of that type.
906         prevOffsetByType[blockType.getId()] = startOffset;
907       }
908 
909       startOffset = -1;
910       blockType = newBlockType;
911 
912       baosInMemory.reset();
913       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
914 
915       state = State.WRITING;
916 
917       // We will compress it later in finishBlock()
918       userDataStream = new DataOutputStream(baosInMemory);
919       if (newBlockType == BlockType.DATA) {
920         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
921       }
922       this.unencodedDataSizeWritten = 0;
923       return userDataStream;
924     }
925 
926     /**
927      * Writes the Cell to this block
928      * @param cell
929      * @throws IOException
930      */
931     public void write(Cell cell) throws IOException{
932       expectState(State.WRITING);
933       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
934           this.userDataStream);
935     }
936 
937     /**
938      * Returns the stream for the user to write to. The block writer takes care
939      * of handling compression and buffering for caching on write. Can only be
940      * called in the "writing" state.
941      *
942      * @return the data output stream for the user to write to
943      */
944     DataOutputStream getUserDataStream() {
945       expectState(State.WRITING);
946       return userDataStream;
947     }
948 
949     /**
950      * Transitions the block writer from the "writing" state to the "block
951      * ready" state.  Does nothing if a block is already finished.
952      */
953     void ensureBlockReady() throws IOException {
954       Preconditions.checkState(state != State.INIT,
955           "Unexpected state: " + state);
956 
957       if (state == State.BLOCK_READY)
958         return;
959 
960       // This will set state to BLOCK_READY.
961       finishBlock();
962     }
963 
964     /**
965      * An internal method that flushes the compressing stream (if using
966      * compression), serializes the header, and takes care of the separate
967      * uncompressed stream for caching on write, if applicable. Sets block
968      * write state to "block ready".
969      */
970     private void finishBlock() throws IOException {
971       if (blockType == BlockType.DATA) {
972         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
973             new BufferGrabbingByteArrayOutputStream();
974         baosInMemory.writeTo(baosInMemoryCopy);
975         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
976             baosInMemoryCopy.buf, blockType);
977         blockType = dataBlockEncodingCtx.getBlockType();
978       }
979       userDataStream.flush();
980       // This does an array copy, so it is safe to cache this byte array.
981       uncompressedBytesWithHeader = baosInMemory.toByteArray();
982       prevOffset = prevOffsetByType[blockType.getId()];
983 
984       // We need to set state before we can package the block up for
985       // cache-on-write. In a way, the block is ready, but not yet encoded or
986       // compressed.
987       state = State.BLOCK_READY;
988       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
989         onDiskBytesWithHeader = dataBlockEncodingCtx
990             .compressAndEncrypt(uncompressedBytesWithHeader);
991       } else {
992         onDiskBytesWithHeader = defaultBlockEncodingCtx
993             .compressAndEncrypt(uncompressedBytesWithHeader);
994       }
995       int numBytes = (int) ChecksumUtil.numBytes(
996           onDiskBytesWithHeader.length,
997           fileContext.getBytesPerChecksum());
998 
999       // put the header for on disk bytes
1000       putHeader(onDiskBytesWithHeader, 0,
1001           onDiskBytesWithHeader.length + numBytes,
1002           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1003       // set the header for the uncompressed bytes (for cache-on-write)
1004       putHeader(uncompressedBytesWithHeader, 0,
1005           onDiskBytesWithHeader.length + numBytes,
1006           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1007 
1008       onDiskChecksum = new byte[numBytes];
1009       ChecksumUtil.generateChecksums(
1010           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1011           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1012     }
1013 
1014     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1015       private byte[] buf;
1016 
1017       @Override
1018       public void write(byte[] b, int off, int len) {
1019         this.buf = b;
1020       }
1021 
1022       public byte[] getBuffer() {
1023         return this.buf;
1024       }
1025     }
1026 
1027     /**
1028      * Put the header into the given byte array at the given offset.
1029      * @param onDiskSize size of the block on disk header + data + checksum
1030      * @param uncompressedSize size of the block after decompression (but
1031      *          before optional data block decoding) including header
1032      * @param onDiskDataSize size of the block on disk with header
1033      *        and data but not including the checksums
1034      */
1035     private void putHeader(byte[] dest, int offset, int onDiskSize,
1036         int uncompressedSize, int onDiskDataSize) {
1037       offset = blockType.put(dest, offset);
1038       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1039       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1040       offset = Bytes.putLong(dest, offset, prevOffset);
1041       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1042       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1043       Bytes.putInt(dest, offset, onDiskDataSize);
1044     }
1045 
1046     /**
1047      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1048      * the offset of this block so that it can be referenced in the next block
1049      * of the same type.
1050      *
1051      * @param out
1052      * @throws IOException
1053      */
1054     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1055       long offset = out.getPos();
1056       if (startOffset != -1 && offset != startOffset) {
1057         throw new IOException("A " + blockType + " block written to a "
1058             + "stream twice, first at offset " + startOffset + ", then at "
1059             + offset);
1060       }
1061       startOffset = offset;
1062 
1063       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1064     }
1065 
1066     /**
1067      * Writes the header and the compressed data of this block (or uncompressed
1068      * data when not using compression) into the given stream. Can be called in
1069      * the "writing" state or in the "block ready" state. If called in the
1070      * "writing" state, transitions the writer to the "block ready" state.
1071      *
1072      * @param out the output stream to write the
1073      * @throws IOException
1074      */
1075     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1076       throws IOException {
1077       ensureBlockReady();
1078       out.write(onDiskBytesWithHeader);
1079       out.write(onDiskChecksum);
1080     }
1081 
1082     /**
1083      * Returns the header or the compressed data (or uncompressed data when not
1084      * using compression) as a byte array. Can be called in the "writing" state
1085      * or in the "block ready" state. If called in the "writing" state,
1086      * transitions the writer to the "block ready" state. This returns
1087      * the header + data + checksums stored on disk.
1088      *
1089      * @return header and data as they would be stored on disk in a byte array
1090      * @throws IOException
1091      */
1092     byte[] getHeaderAndDataForTest() throws IOException {
1093       ensureBlockReady();
1094       // This is not very optimal, because we are doing an extra copy.
1095       // But this method is used only by unit tests.
1096       byte[] output =
1097           new byte[onDiskBytesWithHeader.length
1098               + onDiskChecksum.length];
1099       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1100           onDiskBytesWithHeader.length);
1101       System.arraycopy(onDiskChecksum, 0, output,
1102           onDiskBytesWithHeader.length, onDiskChecksum.length);
1103       return output;
1104     }
1105 
1106     /**
1107      * Releases resources used by this writer.
1108      */
1109     public void release() {
1110       if (dataBlockEncodingCtx != null) {
1111         dataBlockEncodingCtx.close();
1112         dataBlockEncodingCtx = null;
1113       }
1114       if (defaultBlockEncodingCtx != null) {
1115         defaultBlockEncodingCtx.close();
1116         defaultBlockEncodingCtx = null;
1117       }
1118     }
1119 
1120     /**
1121      * Returns the on-disk size of the data portion of the block. This is the
1122      * compressed size if compression is enabled. Can only be called in the
1123      * "block ready" state. Header is not compressed, and its size is not
1124      * included in the return value.
1125      *
1126      * @return the on-disk size of the block, not including the header.
1127      */
1128     int getOnDiskSizeWithoutHeader() {
1129       expectState(State.BLOCK_READY);
1130       return onDiskBytesWithHeader.length
1131           + onDiskChecksum.length
1132           - HConstants.HFILEBLOCK_HEADER_SIZE;
1133     }
1134 
1135     /**
1136      * Returns the on-disk size of the block. Can only be called in the
1137      * "block ready" state.
1138      *
1139      * @return the on-disk size of the block ready to be written, including the
1140      *         header size, the data and the checksum data.
1141      */
1142     int getOnDiskSizeWithHeader() {
1143       expectState(State.BLOCK_READY);
1144       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1145     }
1146 
1147     /**
1148      * The uncompressed size of the block data. Does not include header size.
1149      */
1150     int getUncompressedSizeWithoutHeader() {
1151       expectState(State.BLOCK_READY);
1152       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1153     }
1154 
1155     /**
1156      * The uncompressed size of the block data, including header size.
1157      */
1158     int getUncompressedSizeWithHeader() {
1159       expectState(State.BLOCK_READY);
1160       return uncompressedBytesWithHeader.length;
1161     }
1162 
1163     /** @return true if a block is being written  */
1164     public boolean isWriting() {
1165       return state == State.WRITING;
1166     }
1167 
1168     /**
1169      * Returns the number of bytes written into the current block so far, or
1170      * zero if not writing the block at the moment. Note that this will return
1171      * zero in the "block ready" state as well.
1172      *
1173      * @return the number of bytes written
1174      */
1175     public int blockSizeWritten() {
1176       if (state != State.WRITING) return 0;
1177       return this.unencodedDataSizeWritten;
1178     }
1179 
1180     /**
1181      * Returns the header followed by the uncompressed data, even if using
1182      * compression. This is needed for storing uncompressed blocks in the block
1183      * cache. Can be called in the "writing" state or the "block ready" state.
1184      * Returns only the header and data, does not include checksum data.
1185      *
1186      * @return uncompressed block bytes for caching on write
1187      */
1188     ByteBuffer getUncompressedBufferWithHeader() {
1189       expectState(State.BLOCK_READY);
1190       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1191     }
1192 
1193     /**
1194      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1195      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1196      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1197      * Does not include checksum data.
1198      *
1199      * @return packed block bytes for caching on write
1200      */
1201     ByteBuffer getOnDiskBufferWithHeader() {
1202       expectState(State.BLOCK_READY);
1203       return ByteBuffer.wrap(onDiskBytesWithHeader);
1204     }
1205 
1206     private void expectState(State expectedState) {
1207       if (state != expectedState) {
1208         throw new IllegalStateException("Expected state: " + expectedState +
1209             ", actual state: " + state);
1210       }
1211     }
1212 
1213     /**
1214      * Takes the given {@link BlockWritable} instance, creates a new block of
1215      * its appropriate type, writes the writable into this block, and flushes
1216      * the block into the output stream. The writer is instructed not to buffer
1217      * uncompressed bytes for cache-on-write.
1218      *
1219      * @param bw the block-writable object to write as a block
1220      * @param out the file system output stream
1221      * @throws IOException
1222      */
1223     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1224         throws IOException {
1225       bw.writeToBlock(startWriting(bw.getBlockType()));
1226       writeHeaderAndData(out);
1227     }
1228 
1229     /**
1230      * Creates a new HFileBlock. Checksums have already been validated, so
1231      * the byte buffer passed into the constructor of this newly created
1232      * block does not have checksum data even though the header minor
1233      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1234      * 0 value in bytesPerChecksum.
1235      */
1236     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1237       HFileContext newContext = new HFileContextBuilder()
1238                                 .withBlockSize(fileContext.getBlocksize())
1239                                 .withBytesPerCheckSum(0)
1240                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1241                                 .withCompression(fileContext.getCompression())
1242                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1243                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1244                                 .withCompressTags(fileContext.isCompressTags())
1245                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1246                                 .withIncludesTags(fileContext.isIncludesTags())
1247                                 .build();
1248       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1249           getUncompressedSizeWithoutHeader(), prevOffset,
1250           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1251             getOnDiskBufferWithHeader() :
1252             getUncompressedBufferWithHeader(),
1253           FILL_HEADER, startOffset,
1254           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1255     }
1256   }
1257 
1258   /** Something that can be written into a block. */
1259   public interface BlockWritable {
1260 
1261     /** The type of block this data should use. */
1262     BlockType getBlockType();
1263 
1264     /**
1265      * Writes the block to the provided stream. Must not write any magic
1266      * records.
1267      *
1268      * @param out a stream to write uncompressed data into
1269      */
1270     void writeToBlock(DataOutput out) throws IOException;
1271   }
1272 
1273   // Block readers and writers
1274 
1275   /** An interface allowing to iterate {@link HFileBlock}s. */
1276   public interface BlockIterator {
1277 
1278     /**
1279      * Get the next block, or null if there are no more blocks to iterate.
1280      */
1281     HFileBlock nextBlock() throws IOException;
1282 
1283     /**
1284      * Similar to {@link #nextBlock()} but checks block type, throws an
1285      * exception if incorrect, and returns the HFile block
1286      */
1287     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1288   }
1289 
1290   /** A full-fledged reader with iteration ability. */
1291   public interface FSReader {
1292 
1293     /**
1294      * Reads the block at the given offset in the file with the given on-disk
1295      * size and uncompressed size.
1296      *
1297      * @param offset
1298      * @param onDiskSize the on-disk size of the entire block, including all
1299      *          applicable headers, or -1 if unknown
1300      * @param uncompressedSize the uncompressed size of the compressed part of
1301      *          the block, or -1 if unknown
1302      * @return the newly read block
1303      */
1304     HFileBlock readBlockData(long offset, long onDiskSize,
1305         int uncompressedSize, boolean pread) throws IOException;
1306 
1307     /**
1308      * Creates a block iterator over the given portion of the {@link HFile}.
1309      * The iterator returns blocks starting with offset such that offset &lt;=
1310      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1311      *
1312      * @param startOffset the offset of the block to start iteration with
1313      * @param endOffset the offset to end iteration at (exclusive)
1314      * @return an iterator of blocks between the two given offsets
1315      */
1316     BlockIterator blockRange(long startOffset, long endOffset);
1317 
1318     /** Closes the backing streams */
1319     void closeStreams() throws IOException;
1320 
1321     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1322     HFileBlockDecodingContext getBlockDecodingContext();
1323 
1324     /** Get the default decoder for blocks from this file. */
1325     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1326 
1327     void setIncludesMemstoreTS(boolean includesMemstoreTS);
1328     void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1329   }
1330 
1331   /**
1332    * We always prefetch the header of the next block, so that we know its
1333    * on-disk size in advance and can read it in one operation.
1334    */
1335   private static class PrefetchedHeader {
1336     long offset = -1;
1337     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1338     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1339   }
1340 
1341   /** Reads version 2 blocks from the filesystem. */
1342   static class FSReaderImpl implements FSReader {
1343     /** The file system stream of the underlying {@link HFile} that
1344      * does or doesn't do checksum validations in the filesystem */
1345     protected FSDataInputStreamWrapper streamWrapper;
1346 
1347     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1348 
1349     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1350     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1351 
1352     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1353         new ThreadLocal<PrefetchedHeader>() {
1354       @Override
1355       public PrefetchedHeader initialValue() {
1356         return new PrefetchedHeader();
1357       }
1358     };
1359 
1360     /** Compression algorithm used by the {@link HFile} */
1361 
1362     /** The size of the file we are reading from, or -1 if unknown. */
1363     protected long fileSize;
1364 
1365     /** The size of the header */
1366     protected final int hdrSize;
1367 
1368     /** The filesystem used to access data */
1369     protected HFileSystem hfs;
1370 
1371     /** The path (if any) where this data is coming from */
1372     protected Path path;
1373 
1374     private final Lock streamLock = new ReentrantLock();
1375 
1376     /** The default buffer size for our buffered streams */
1377     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1378 
1379     protected HFileContext fileContext;
1380 
1381     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1382         HFileContext fileContext) throws IOException {
1383       this.fileSize = fileSize;
1384       this.hfs = hfs;
1385       this.path = path;
1386       this.fileContext = fileContext;
1387       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1388 
1389       this.streamWrapper = stream;
1390       // Older versions of HBase didn't support checksum.
1391       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1392       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1393       encodedBlockDecodingCtx = defaultDecodingCtx;
1394     }
1395 
1396     /**
1397      * A constructor that reads files with the latest minor version.
1398      * This is used by unit tests only.
1399      */
1400     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1401     throws IOException {
1402       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1403     }
1404 
1405     public BlockIterator blockRange(final long startOffset, final long endOffset) {
1406       final FSReader owner = this; // handle for inner class
1407       return new BlockIterator() {
1408         private long offset = startOffset;
1409 
1410         @Override
1411         public HFileBlock nextBlock() throws IOException {
1412           if (offset >= endOffset)
1413             return null;
1414           HFileBlock b = readBlockData(offset, -1, -1, false);
1415           offset += b.getOnDiskSizeWithHeader();
1416           return b.unpack(fileContext, owner);
1417         }
1418 
1419         @Override
1420         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1421             throws IOException {
1422           HFileBlock blk = nextBlock();
1423           if (blk.getBlockType() != blockType) {
1424             throw new IOException("Expected block of type " + blockType
1425                 + " but found " + blk.getBlockType());
1426           }
1427           return blk;
1428         }
1429       };
1430     }
1431 
1432     /**
1433      * Does a positional read or a seek and read into the given buffer. Returns
1434      * the on-disk size of the next block, or -1 if it could not be determined.
1435      *
1436      * @param dest destination buffer
1437      * @param destOffset offset in the destination buffer
1438      * @param size size of the block to be read
1439      * @param peekIntoNextBlock whether to read the next block's on-disk size
1440      * @param fileOffset position in the stream to read at
1441      * @param pread whether we should do a positional read
1442      * @param istream The input source of data
1443      * @return the on-disk size of the next block with header size included, or
1444      *         -1 if it could not be determined
1445      * @throws IOException
1446      */
1447     protected int readAtOffset(FSDataInputStream istream,
1448         byte[] dest, int destOffset, int size,
1449         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1450         throws IOException {
1451       if (peekIntoNextBlock &&
1452           destOffset + size + hdrSize > dest.length) {
1453         // We are asked to read the next block's header as well, but there is
1454         // not enough room in the array.
1455         throw new IOException("Attempted to read " + size + " bytes and " +
1456             hdrSize + " bytes of next header into a " + dest.length +
1457             "-byte array at offset " + destOffset);
1458       }
1459 
1460       if (!pread && streamLock.tryLock()) {
1461         // Seek + read. Better for scanning.
1462         try {
1463           istream.seek(fileOffset);
1464 
1465           long realOffset = istream.getPos();
1466           if (realOffset != fileOffset) {
1467             throw new IOException("Tried to seek to " + fileOffset + " to "
1468                 + "read " + size + " bytes, but pos=" + realOffset
1469                 + " after seek");
1470           }
1471 
1472           if (!peekIntoNextBlock) {
1473             IOUtils.readFully(istream, dest, destOffset, size);
1474             return -1;
1475           }
1476 
1477           // Try to read the next block header.
1478           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1479             return -1;
1480         } finally {
1481           streamLock.unlock();
1482         }
1483       } else {
1484         // Positional read. Better for random reads; or when the streamLock is already locked.
1485         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1486         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1487             size, extraSize)) {
1488           return -1;
1489         }
1490       }
1491 
1492       assert peekIntoNextBlock;
1493       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1494     }
1495 
1496     /**
1497      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1498      * little memory allocation as possible, using the provided on-disk size.
1499      *
1500      * @param offset the offset in the stream to read at
1501      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1502      *          the header, or -1 if unknown
1503      * @param uncompressedSize the uncompressed size of the the block. Always
1504      *          expected to be -1. This parameter is only used in version 1.
1505      * @param pread whether to use a positional read
1506      */
1507     @Override
1508     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1509         int uncompressedSize, boolean pread)
1510     throws IOException {
1511 
1512       // get a copy of the current state of whether to validate
1513       // hbase checksums or not for this read call. This is not
1514       // thread-safe but the one constaint is that if we decide
1515       // to skip hbase checksum verification then we are
1516       // guaranteed to use hdfs checksum verification.
1517       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1518       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1519 
1520       HFileBlock blk = readBlockDataInternal(is, offset,
1521                          onDiskSizeWithHeaderL,
1522                          uncompressedSize, pread,
1523                          doVerificationThruHBaseChecksum);
1524       if (blk == null) {
1525         HFile.LOG.warn("HBase checksum verification failed for file " +
1526                        path + " at offset " +
1527                        offset + " filesize " + fileSize +
1528                        ". Retrying read with HDFS checksums turned on...");
1529 
1530         if (!doVerificationThruHBaseChecksum) {
1531           String msg = "HBase checksum verification failed for file " +
1532                        path + " at offset " +
1533                        offset + " filesize " + fileSize +
1534                        " but this cannot happen because doVerify is " +
1535                        doVerificationThruHBaseChecksum;
1536           HFile.LOG.warn(msg);
1537           throw new IOException(msg); // cannot happen case here
1538         }
1539         HFile.checksumFailures.incrementAndGet(); // update metrics
1540 
1541         // If we have a checksum failure, we fall back into a mode where
1542         // the next few reads use HDFS level checksums. We aim to make the
1543         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1544         // hbase checksum verification, but since this value is set without
1545         // holding any locks, it can so happen that we might actually do
1546         // a few more than precisely this number.
1547         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1548         doVerificationThruHBaseChecksum = false;
1549         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1550                                     uncompressedSize, pread,
1551                                     doVerificationThruHBaseChecksum);
1552         if (blk != null) {
1553           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1554                          path + " at offset " +
1555                          offset + " filesize " + fileSize);
1556         }
1557       }
1558       if (blk == null && !doVerificationThruHBaseChecksum) {
1559         String msg = "readBlockData failed, possibly due to " +
1560                      "checksum verification failed for file " + path +
1561                      " at offset " + offset + " filesize " + fileSize;
1562         HFile.LOG.warn(msg);
1563         throw new IOException(msg);
1564       }
1565 
1566       // If there is a checksum mismatch earlier, then retry with
1567       // HBase checksums switched off and use HDFS checksum verification.
1568       // This triggers HDFS to detect and fix corrupt replicas. The
1569       // next checksumOffCount read requests will use HDFS checksums.
1570       // The decrementing of this.checksumOffCount is not thread-safe,
1571       // but it is harmless because eventually checksumOffCount will be
1572       // a negative number.
1573       streamWrapper.checksumOk();
1574       return blk;
1575     }
1576 
1577     /**
1578      * Reads a version 2 block.
1579      *
1580      * @param offset the offset in the stream to read at
1581      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1582      *          the header, or -1 if unknown
1583      * @param uncompressedSize the uncompressed size of the the block. Always
1584      *          expected to be -1. This parameter is only used in version 1.
1585      * @param pread whether to use a positional read
1586      * @param verifyChecksum Whether to use HBase checksums.
1587      *        If HBase checksum is switched off, then use HDFS checksum.
1588      * @return the HFileBlock or null if there is a HBase checksum mismatch
1589      */
1590     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1591         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1592         boolean verifyChecksum)
1593     throws IOException {
1594       if (offset < 0) {
1595         throw new IOException("Invalid offset=" + offset + " trying to read "
1596             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1597             + ", uncompressedSize=" + uncompressedSize + ")");
1598       }
1599 
1600       if (uncompressedSize != -1) {
1601         throw new IOException("Version 2 block reader API does not need " +
1602             "the uncompressed size parameter");
1603       }
1604 
1605       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1606           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1607         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1608             + ": expected to be at least " + hdrSize
1609             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1610             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1611       }
1612 
1613       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1614       // See if we can avoid reading the header. This is desirable, because
1615       // we will not incur a backward seek operation if we have already
1616       // read this block's header as part of the previous read's look-ahead.
1617       // And we also want to skip reading the header again if it has already
1618       // been read.
1619       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1620       // is pertinent and we have to be reading next block as in a big scan.
1621       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1622       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1623 
1624       // Allocate enough space to fit the next block's header too.
1625       int nextBlockOnDiskSize = 0;
1626       byte[] onDiskBlock = null;
1627 
1628       HFileBlock b = null;
1629       if (onDiskSizeWithHeader > 0) {
1630         // We know the total on-disk size. Read the entire block into memory,
1631         // then parse the header. This code path is used when
1632         // doing a random read operation relying on the block index, as well as
1633         // when the client knows the on-disk size from peeking into the next
1634         // block's header (e.g. this block's header) when reading the previous
1635         // block. This is the faster and more preferable case.
1636 
1637         // Size that we have to skip in case we have already read the header.
1638         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1639         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1640                                                                 // next block's header
1641         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1642             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1643             true, offset + preReadHeaderSize, pread);
1644         if (headerBuf != null) {
1645           // the header has been read when reading the previous block, copy
1646           // to this block's header
1647           // headerBuf is HBB
1648           assert headerBuf.hasArray();
1649           System.arraycopy(headerBuf.array(),
1650               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1651         } else {
1652           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1653         }
1654         // We know the total on-disk size but not the uncompressed size. Parse the header.
1655         try {
1656           // TODO: FIX!!! Expensive parse just to get a length
1657           b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1658         } catch (IOException ex) {
1659           // Seen in load testing. Provide comprehensive debug info.
1660           throw new IOException("Failed to read compressed block at "
1661               + offset
1662               + ", onDiskSizeWithoutHeader="
1663               + onDiskSizeWithHeader
1664               + ", preReadHeaderSize="
1665               + hdrSize
1666               + ", header.length="
1667               + prefetchedHeader.header.length
1668               + ", header bytes: "
1669               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1670                   hdrSize), ex);
1671         }
1672         // if the caller specifies a onDiskSizeWithHeader, validate it.
1673         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1674         assert onDiskSizeWithoutHeader >= 0;
1675         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1676       } else {
1677         // Check headerBuf to see if we have read this block's header as part of
1678         // reading the previous block. This is an optimization of peeking into
1679         // the next block's header (e.g.this block's header) when reading the
1680         // previous block. This is the faster and more preferable case. If the
1681         // header is already there, don't read the header again.
1682 
1683         // Unfortunately, we still have to do a separate read operation to
1684         // read the header.
1685         if (headerBuf == null) {
1686           // From the header, determine the on-disk size of the given hfile
1687           // block, and read the remaining data, thereby incurring two read
1688           // operations. This might happen when we are doing the first read
1689           // in a series of reads or a random read, and we don't have access
1690           // to the block index. This is costly and should happen very rarely.
1691           headerBuf = ByteBuffer.allocate(hdrSize);
1692           // headerBuf is HBB
1693           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1694               hdrSize, false, offset, pread);
1695         }
1696         // TODO: FIX!!! Expensive parse just to get a length
1697         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1698         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1699         // headerBuf is HBB
1700         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1701         nextBlockOnDiskSize =
1702           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1703               - hdrSize, true, offset + hdrSize, pread);
1704         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1705       }
1706 
1707       if (!fileContext.isCompressedOrEncrypted()) {
1708         b.assumeUncompressed();
1709       }
1710 
1711       if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1712         return null;             // checksum mismatch
1713       }
1714 
1715       // The onDiskBlock will become the headerAndDataBuffer for this block.
1716       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1717       // contains the header of next block, so no need to set next
1718       // block's header in it.
1719       b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1720         this.fileContext.isUseHBaseChecksum());
1721 
1722       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1723 
1724       // Set prefetched header
1725       if (b.hasNextBlockHeader()) {
1726         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1727         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1728       }
1729 
1730       b.offset = offset;
1731       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1732       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1733       return b;
1734     }
1735 
1736     public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1737       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1738     }
1739 
1740     public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1741       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1742     }
1743 
1744     @Override
1745     public HFileBlockDecodingContext getBlockDecodingContext() {
1746       return this.encodedBlockDecodingCtx;
1747     }
1748 
1749     @Override
1750     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1751       return this.defaultDecodingCtx;
1752     }
1753 
1754     /**
1755      * Generates the checksum for the header as well as the data and
1756      * then validates that it matches the value stored in the header.
1757      * If there is a checksum mismatch, then return false. Otherwise
1758      * return true.
1759      */
1760     protected boolean validateBlockChecksum(HFileBlock block,  byte[] data, int hdrSize)
1761         throws IOException {
1762       return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
1763     }
1764 
1765     @Override
1766     public void closeStreams() throws IOException {
1767       streamWrapper.close();
1768     }
1769 
1770     @Override
1771     public String toString() {
1772       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1773     }
1774   }
1775 
1776   @Override
1777   public int getSerializedLength() {
1778     if (buf != null) {
1779       // include extra bytes for the next header when it's available.
1780       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1781       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1782     }
1783     return 0;
1784   }
1785 
1786   @Override
1787   public void serialize(ByteBuffer destination) {
1788     this.buf.get(destination, 0, getSerializedLength()
1789         - EXTRA_SERIALIZATION_SPACE);
1790     serializeExtraInfo(destination);
1791   }
1792 
1793   public void serializeExtraInfo(ByteBuffer destination) {
1794     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1795     destination.putLong(this.offset);
1796     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1797     destination.rewind();
1798   }
1799 
1800   @Override
1801   public CacheableDeserializer<Cacheable> getDeserializer() {
1802     return HFileBlock.blockDeserializer;
1803   }
1804 
1805   @Override
1806   public int hashCode() {
1807     int result = 1;
1808     result = result * 31 + blockType.hashCode();
1809     result = result * 31 + nextBlockOnDiskSizeWithHeader;
1810     result = result * 31 + (int) (offset ^ (offset >>> 32));
1811     result = result * 31 + onDiskSizeWithoutHeader;
1812     result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1813     result = result * 31 + uncompressedSizeWithoutHeader;
1814     result = result * 31 + buf.hashCode();
1815     return result;
1816   }
1817 
1818   @Override
1819   public boolean equals(Object comparison) {
1820     if (this == comparison) {
1821       return true;
1822     }
1823     if (comparison == null) {
1824       return false;
1825     }
1826     if (comparison.getClass() != this.getClass()) {
1827       return false;
1828     }
1829 
1830     HFileBlock castedComparison = (HFileBlock) comparison;
1831 
1832     if (castedComparison.blockType != this.blockType) {
1833       return false;
1834     }
1835     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1836       return false;
1837     }
1838     if (castedComparison.offset != this.offset) {
1839       return false;
1840     }
1841     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1842       return false;
1843     }
1844     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1845       return false;
1846     }
1847     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1848       return false;
1849     }
1850     if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1851         castedComparison.buf.limit()) != 0) {
1852       return false;
1853     }
1854     return true;
1855   }
1856 
1857   public DataBlockEncoding getDataBlockEncoding() {
1858     if (blockType == BlockType.ENCODED_DATA) {
1859       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1860     }
1861     return DataBlockEncoding.NONE;
1862   }
1863 
1864   byte getChecksumType() {
1865     return this.fileContext.getChecksumType().getCode();
1866   }
1867 
1868   int getBytesPerChecksum() {
1869     return this.fileContext.getBytesPerChecksum();
1870   }
1871 
1872   /** @return the size of data on disk + header. Excludes checksum. */
1873   int getOnDiskDataSizeWithHeader() {
1874     return this.onDiskDataSizeWithHeader;
1875   }
1876 
1877   /**
1878    * Calcuate the number of bytes required to store all the checksums
1879    * for this block. Each checksum value is a 4 byte integer.
1880    */
1881   int totalChecksumBytes() {
1882     // If the hfile block has minorVersion 0, then there are no checksum
1883     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1884     // indicates that cached blocks do not have checksum data because
1885     // checksums were already validated when the block was read from disk.
1886     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1887       return 0;
1888     }
1889     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1890         this.fileContext.getBytesPerChecksum());
1891   }
1892 
1893   /**
1894    * Returns the size of this block header.
1895    */
1896   public int headerSize() {
1897     return headerSize(this.fileContext.isUseHBaseChecksum());
1898   }
1899 
1900   /**
1901    * Maps a minor version to the size of the header.
1902    */
1903   public static int headerSize(boolean usesHBaseChecksum) {
1904     if (usesHBaseChecksum) {
1905       return HConstants.HFILEBLOCK_HEADER_SIZE;
1906     }
1907     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1908   }
1909 
1910   /**
1911    * Return the appropriate DUMMY_HEADER for the minor version
1912    */
1913   public byte[] getDummyHeaderForVersion() {
1914     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1915   }
1916 
1917   /**
1918    * Return the appropriate DUMMY_HEADER for the minor version
1919    */
1920   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1921     if (usesHBaseChecksum) {
1922       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1923     }
1924     return DUMMY_HEADER_NO_CHECKSUM;
1925   }
1926 
1927   /**
1928    * @return the HFileContext used to create this HFileBlock. Not necessary the
1929    * fileContext for the file from which this block's data was originally read.
1930    */
1931   public HFileContext getHFileContext() {
1932     return this.fileContext;
1933   }
1934 
1935   @Override
1936   public MemoryType getMemoryType() {
1937     return this.memType;
1938   }
1939 
1940   /**
1941    * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
1942    */
1943   public boolean usesSharedMemory() {
1944     return this.memType == MemoryType.SHARED;
1945   }
1946 
1947   /**
1948    * Convert the contents of the block header into a human readable string.
1949    * This is mostly helpful for debugging. This assumes that the block
1950    * has minor version > 0.
1951    */
1952   static String toStringHeader(ByteBuff buf) throws IOException {
1953     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1954     buf.get(magicBuf);
1955     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1956     int compressedBlockSizeNoHeader = buf.getInt();
1957     int uncompressedBlockSizeNoHeader = buf.getInt();
1958     long prevBlockOffset = buf.getLong();
1959     byte cksumtype = buf.get();
1960     long bytesPerChecksum = buf.getInt();
1961     long onDiskDataSizeWithHeader = buf.getInt();
1962     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1963                    " blockType " + bt +
1964                    " compressedBlockSizeNoHeader " +
1965                    compressedBlockSizeNoHeader +
1966                    " uncompressedBlockSizeNoHeader " +
1967                    uncompressedBlockSizeNoHeader +
1968                    " prevBlockOffset " + prevBlockOffset +
1969                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1970                    " bytesPerChecksum " + bytesPerChecksum +
1971                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1972   }
1973 }