1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.ByteBuffer;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.fs.HFileSystem;
37 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
38 import org.apache.hadoop.hbase.io.ByteBuffInputStream;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44 import org.apache.hadoop.hbase.nio.ByteBuff;
45 import org.apache.hadoop.hbase.nio.MultiByteBuff;
46 import org.apache.hadoop.hbase.nio.SingleByteBuff;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.ChecksumType;
49 import org.apache.hadoop.hbase.util.ClassSize;
50 import org.apache.hadoop.io.IOUtils;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Preconditions;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @InterfaceAudience.Private
88 public class HFileBlock implements Cacheable {
89
90
91
92
93
94
95 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96
97 public static final boolean FILL_HEADER = true;
98 public static final boolean DONT_FILL_HEADER = false;
99
100
101
102
103
104 public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105 + DataBlockEncoding.ID_SIZE;
106
107 static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109
110
111 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
112 new MultiByteBuff(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false);
113
114
115 public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
116 + Bytes.SIZEOF_LONG;
117
118
119
120
121 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
122
123 static final CacheableDeserializer<Cacheable> blockDeserializer =
124 new CacheableDeserializer<Cacheable>() {
125 public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
126 throws IOException {
127 buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
128 ByteBuff newByteBuffer;
129 if (reuse) {
130 newByteBuffer = buf.slice();
131 } else {
132
133 int len = buf.limit();
134 newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len));
135 newByteBuffer.put(0, buf, buf.position(), len);
136 }
137 buf.position(buf.limit());
138 buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
139 boolean usesChecksum = buf.get() == (byte)1;
140 HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType);
141 hFileBlock.offset = buf.getLong();
142 hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
143 if (hFileBlock.hasNextBlockHeader()) {
144 hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
145 }
146 return hFileBlock;
147 }
148
149 @Override
150 public int getDeserialiserIdentifier() {
151 return deserializerIdentifier;
152 }
153
154 @Override
155 public HFileBlock deserialize(ByteBuff b) throws IOException {
156
157 return deserialize(b, false, MemoryType.EXCLUSIVE);
158 }
159 };
160 private static final int deserializerIdentifier;
161 static {
162 deserializerIdentifier = CacheableDeserializerIdManager
163 .registerDeserializer(blockDeserializer);
164 }
165
166
167 private BlockType blockType;
168
169
170 private int onDiskSizeWithoutHeader;
171
172
173 private final int uncompressedSizeWithoutHeader;
174
175
176 private final long prevBlockOffset;
177
178
179
180
181
182 private final int onDiskDataSizeWithHeader;
183
184
185 private ByteBuff buf;
186
187
188 private HFileContext fileContext;
189
190
191
192
193
194 private long offset = -1;
195
196
197
198
199
200
201 private int nextBlockOnDiskSizeWithHeader = -1;
202
203 private MemoryType memType = MemoryType.EXCLUSIVE;
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
222 long prevBlockOffset, ByteBuff buf, boolean fillHeader, long offset,
223 int onDiskDataSizeWithHeader, HFileContext fileContext) {
224 this.blockType = blockType;
225 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
226 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
227 this.prevBlockOffset = prevBlockOffset;
228 this.buf = buf;
229 this.offset = offset;
230 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
231 this.fileContext = fileContext;
232 if (fillHeader)
233 overwriteHeader();
234 this.buf.rewind();
235 }
236
237 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
238 long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
239 int onDiskDataSizeWithHeader, HFileContext fileContext) {
240 this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
241 new SingleByteBuff(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext);
242 }
243
244
245
246
247 HFileBlock(HFileBlock that) {
248 this.blockType = that.blockType;
249 this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
250 this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
251 this.prevBlockOffset = that.prevBlockOffset;
252 this.buf = that.buf.duplicate();
253 this.offset = that.offset;
254 this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
255 this.fileContext = that.fileContext;
256 this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
257 }
258
259 HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
260 this(new SingleByteBuff(b), usesHBaseChecksum);
261 }
262
263
264
265
266
267
268
269 HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
270 this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE);
271 }
272
273
274
275
276
277
278
279 HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException {
280 b.rewind();
281 blockType = BlockType.read(b);
282 onDiskSizeWithoutHeader = b.getInt();
283 uncompressedSizeWithoutHeader = b.getInt();
284 prevBlockOffset = b.getLong();
285 HFileContextBuilder contextBuilder = new HFileContextBuilder();
286 contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
287 if (usesHBaseChecksum) {
288 contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
289 contextBuilder.withBytesPerCheckSum(b.getInt());
290 this.onDiskDataSizeWithHeader = b.getInt();
291 } else {
292 contextBuilder.withChecksumType(ChecksumType.NULL);
293 contextBuilder.withBytesPerCheckSum(0);
294 this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
295 HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
296 }
297 this.fileContext = contextBuilder.build();
298 this.memType = memType;
299 buf = b;
300 buf.rewind();
301 }
302
303 public BlockType getBlockType() {
304 return blockType;
305 }
306
307
308 public short getDataBlockEncodingId() {
309 if (blockType != BlockType.ENCODED_DATA) {
310 throw new IllegalArgumentException("Querying encoder ID of a block " +
311 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
312 }
313 return buf.getShort(headerSize());
314 }
315
316
317
318
319 public int getOnDiskSizeWithHeader() {
320 return onDiskSizeWithoutHeader + headerSize();
321 }
322
323
324
325
326 public int getOnDiskSizeWithoutHeader() {
327 return onDiskSizeWithoutHeader;
328 }
329
330
331
332
333 public int getUncompressedSizeWithoutHeader() {
334 return uncompressedSizeWithoutHeader;
335 }
336
337
338
339
340
341 public long getPrevBlockOffset() {
342 return prevBlockOffset;
343 }
344
345
346
347
348
349 private void overwriteHeader() {
350 buf.rewind();
351 blockType.write(buf);
352 buf.putInt(onDiskSizeWithoutHeader);
353 buf.putInt(uncompressedSizeWithoutHeader);
354 buf.putLong(prevBlockOffset);
355 if (this.fileContext.isUseHBaseChecksum()) {
356 buf.put(fileContext.getChecksumType().getCode());
357 buf.putInt(fileContext.getBytesPerChecksum());
358 buf.putInt(onDiskDataSizeWithHeader);
359 }
360 }
361
362
363
364
365
366
367 public ByteBuff getBufferWithoutHeader() {
368 ByteBuff dup = this.buf.duplicate();
369 dup.position(headerSize());
370 dup.limit(buf.limit() - totalChecksumBytes());
371 return dup.slice();
372 }
373
374
375
376
377
378
379
380
381
382
383 public ByteBuff getBufferReadOnly() {
384 ByteBuff dup = this.buf.duplicate();
385 dup.limit(buf.limit() - totalChecksumBytes());
386 return dup.slice();
387 }
388
389
390
391
392
393
394
395
396 public ByteBuff getBufferReadOnlyWithHeader() {
397 ByteBuff dup = this.buf.duplicate();
398 return dup.slice();
399 }
400
401
402
403
404
405
406
407 ByteBuff getBufferWithHeader() {
408 ByteBuff dupBuf = buf.duplicate();
409 dupBuf.rewind();
410 return dupBuf;
411 }
412
413 private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
414 String fieldName) throws IOException {
415 if (valueFromBuf != valueFromField) {
416 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
417 + ") is different from that in the field (" + valueFromField + ")");
418 }
419 }
420
421 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
422 throws IOException {
423 if (valueFromBuf != valueFromField) {
424 throw new IOException("Block type stored in the buffer: " +
425 valueFromBuf + ", block type field: " + valueFromField);
426 }
427 }
428
429
430
431
432
433
434
435
436 void sanityCheck() throws IOException {
437 buf.rewind();
438
439 sanityCheckAssertion(BlockType.read(buf), blockType);
440
441 sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
442 "onDiskSizeWithoutHeader");
443
444 sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
445 "uncompressedSizeWithoutHeader");
446
447 sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
448 if (this.fileContext.isUseHBaseChecksum()) {
449 sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
450 sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(),
451 "bytesPerChecksum");
452 sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
453 }
454
455 int cksumBytes = totalChecksumBytes();
456 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
457 if (buf.limit() != expectedBufLimit) {
458 throw new AssertionError("Expected buffer limit " + expectedBufLimit
459 + ", got " + buf.limit());
460 }
461
462
463
464 int hdrSize = headerSize();
465 if (buf.capacity() != expectedBufLimit &&
466 buf.capacity() != expectedBufLimit + hdrSize) {
467 throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
468 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
469 }
470 }
471
472 @Override
473 public String toString() {
474 StringBuilder sb = new StringBuilder()
475 .append("HFileBlock [")
476 .append(" fileOffset=").append(offset)
477 .append(" headerSize()=").append(headerSize())
478 .append(" blockType=").append(blockType)
479 .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
480 .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
481 .append(" prevBlockOffset=").append(prevBlockOffset)
482 .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
483 if (fileContext.isUseHBaseChecksum()) {
484 sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
485 .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
486 .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
487 } else {
488 sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
489 .append("(").append(onDiskSizeWithoutHeader)
490 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
491 }
492 String dataBegin = null;
493 if (buf.hasArray()) {
494 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
495 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
496 } else {
497 ByteBuff bufWithoutHeader = getBufferWithoutHeader();
498 byte[] dataBeginBytes = new byte[Math.min(32,
499 bufWithoutHeader.limit() - bufWithoutHeader.position())];
500 bufWithoutHeader.get(dataBeginBytes);
501 dataBegin = Bytes.toStringBinary(dataBeginBytes);
502 }
503 sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
504 .append(" totalChecksumBytes()=").append(totalChecksumBytes())
505 .append(" isUnpacked()=").append(isUnpacked())
506 .append(" buf=[ ").append(buf).append(" ]")
507 .append(" dataBeginsWith=").append(dataBegin)
508 .append(" fileContext=").append(fileContext)
509 .append(" ]");
510 return sb.toString();
511 }
512
513
514
515
516 private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
517 throws IOException {
518 if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
519 String dataBegin = null;
520 if (buf.hasArray()) {
521 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
522 } else {
523 ByteBuff bufDup = getBufferReadOnly();
524 byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
525 bufDup.get(dataBeginBytes);
526 dataBegin = Bytes.toStringBinary(dataBeginBytes);
527 }
528 String blockInfoMsg =
529 "Block offset: " + offset + ", data starts with: " + dataBegin;
530 throw new IOException("On-disk size without header provided is "
531 + expectedOnDiskSizeWithoutHeader + ", but block "
532 + "header contains " + onDiskSizeWithoutHeader + ". " +
533 blockInfoMsg);
534 }
535 }
536
537
538
539
540
541 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
542 if (!fileContext.isCompressedOrEncrypted()) {
543
544
545
546 return this;
547 }
548
549 HFileBlock unpacked = new HFileBlock(this);
550 unpacked.allocateBuffer();
551
552 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
553 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
554
555 ByteBuff dup = this.buf.duplicate();
556 dup.position(this.headerSize());
557 dup = dup.slice();
558 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
559 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
560 dup);
561
562
563 if (unpacked.hasNextBlockHeader()) {
564
565
566
567
568 ByteBuff inDup = this.buf.duplicate();
569 inDup.limit(inDup.limit() + headerSize());
570 ByteBuff outDup = unpacked.buf.duplicate();
571 outDup.limit(outDup.limit() + unpacked.headerSize());
572 outDup.put(
573 unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
574 + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader,
575 unpacked.headerSize());
576 }
577 return unpacked;
578 }
579
580
581
582
583 private boolean hasNextBlockHeader() {
584 return nextBlockOnDiskSizeWithHeader > 0;
585 }
586
587
588
589
590
591
592 private void allocateBuffer() {
593 int cksumBytes = totalChecksumBytes();
594 int headerSize = headerSize();
595 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
596 cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
597
598
599 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
600
601
602
603 buf.position(0);
604 buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
605
606 buf = new SingleByteBuff(newBuf);
607
608 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
609 }
610
611
612
613
614
615 public boolean isUnpacked() {
616 final int cksumBytes = totalChecksumBytes();
617 final int headerSize = headerSize();
618 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
619 final int bufCapacity = buf.capacity();
620 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
621 }
622
623
624 public void assumeUncompressed() throws IOException {
625 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
626 totalChecksumBytes()) {
627 throw new IOException("Using no compression but "
628 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
629 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
630 + ", numChecksumbytes=" + totalChecksumBytes());
631 }
632 }
633
634
635
636
637
638 public void expectType(BlockType expectedType) throws IOException {
639 if (blockType != expectedType) {
640 throw new IOException("Invalid block type: expected=" + expectedType
641 + ", actual=" + blockType);
642 }
643 }
644
645
646 public long getOffset() {
647 if (offset < 0) {
648 throw new IllegalStateException(
649 "HFile block offset not initialized properly");
650 }
651 return offset;
652 }
653
654
655
656
657 public DataInputStream getByteStream() {
658 ByteBuff dup = this.buf.duplicate();
659 dup.position(this.headerSize());
660 return new DataInputStream(new ByteBuffInputStream(dup));
661 }
662
663 @Override
664 public long heapSize() {
665 long size = ClassSize.align(
666 ClassSize.OBJECT +
667
668 4 * ClassSize.REFERENCE +
669
670
671 4 * Bytes.SIZEOF_INT +
672
673 2 * Bytes.SIZEOF_LONG +
674
675 fileContext.heapSize()
676 );
677
678 if (buf != null) {
679
680 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
681 }
682
683 return ClassSize.align(size);
684 }
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701 public static boolean readWithExtra(InputStream in, byte[] buf,
702 int bufOffset, int necessaryLen, int extraLen) throws IOException {
703 int bytesRemaining = necessaryLen + extraLen;
704 while (bytesRemaining > 0) {
705 int ret = in.read(buf, bufOffset, bytesRemaining);
706 if (ret == -1 && bytesRemaining <= extraLen) {
707
708 break;
709 }
710
711 if (ret < 0) {
712 throw new IOException("Premature EOF from inputStream (read "
713 + "returned " + ret + ", was trying to read " + necessaryLen
714 + " necessary bytes and " + extraLen + " extra bytes, "
715 + "successfully read "
716 + (necessaryLen + extraLen - bytesRemaining));
717 }
718 bufOffset += ret;
719 bytesRemaining -= ret;
720 }
721 return bytesRemaining <= 0;
722 }
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741 @VisibleForTesting
742 static boolean positionalReadWithExtra(FSDataInputStream in,
743 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
744 throws IOException {
745 int bytesRemaining = necessaryLen + extraLen;
746 int bytesRead = 0;
747 while (bytesRead < necessaryLen) {
748 int ret = in.read(position, buf, bufOffset, bytesRemaining);
749 if (ret < 0) {
750 throw new IOException("Premature EOF from inputStream (positional read "
751 + "returned " + ret + ", was trying to read " + necessaryLen
752 + " necessary bytes and " + extraLen + " extra bytes, "
753 + "successfully read " + bytesRead);
754 }
755 position += ret;
756 bufOffset += ret;
757 bytesRemaining -= ret;
758 bytesRead += ret;
759 }
760 return bytesRead != necessaryLen && bytesRemaining <= 0;
761 }
762
763
764
765
766
767 public int getNextBlockOnDiskSizeWithHeader() {
768 return nextBlockOnDiskSizeWithHeader;
769 }
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784 public static class Writer {
785
786 private enum State {
787 INIT,
788 WRITING,
789 BLOCK_READY
790 };
791
792
793 private State state = State.INIT;
794
795
796 private final HFileDataBlockEncoder dataBlockEncoder;
797
798 private HFileBlockEncodingContext dataBlockEncodingCtx;
799
800
801 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
802
803
804
805
806
807
808
809 private ByteArrayOutputStream baosInMemory;
810
811
812
813
814
815
816 private BlockType blockType;
817
818
819
820
821
822 private DataOutputStream userDataStream;
823
824
825
826 private int unencodedDataSizeWritten;
827
828
829
830
831
832
833 private byte[] onDiskBytesWithHeader;
834
835
836
837
838
839
840
841 private byte[] onDiskChecksum;
842
843
844
845
846
847
848
849
850 private byte[] uncompressedBytesWithHeader;
851
852
853
854
855
856 private long startOffset;
857
858
859
860
861
862 private long[] prevOffsetByType;
863
864
865 private long prevOffset;
866
867 private HFileContext fileContext;
868
869
870
871
872 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
873 this.dataBlockEncoder = dataBlockEncoder != null
874 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
875 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
876 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
877 dataBlockEncodingCtx = this.dataBlockEncoder
878 .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
879
880 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
881 throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
882 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
883 fileContext.getBytesPerChecksum());
884 }
885
886 baosInMemory = new ByteArrayOutputStream();
887
888 prevOffsetByType = new long[BlockType.values().length];
889 for (int i = 0; i < prevOffsetByType.length; ++i)
890 prevOffsetByType[i] = -1;
891
892 this.fileContext = fileContext;
893 }
894
895
896
897
898
899
900
901 public DataOutputStream startWriting(BlockType newBlockType)
902 throws IOException {
903 if (state == State.BLOCK_READY && startOffset != -1) {
904
905
906 prevOffsetByType[blockType.getId()] = startOffset;
907 }
908
909 startOffset = -1;
910 blockType = newBlockType;
911
912 baosInMemory.reset();
913 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
914
915 state = State.WRITING;
916
917
918 userDataStream = new DataOutputStream(baosInMemory);
919 if (newBlockType == BlockType.DATA) {
920 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
921 }
922 this.unencodedDataSizeWritten = 0;
923 return userDataStream;
924 }
925
926
927
928
929
930
931 public void write(Cell cell) throws IOException{
932 expectState(State.WRITING);
933 this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
934 this.userDataStream);
935 }
936
937
938
939
940
941
942
943
944 DataOutputStream getUserDataStream() {
945 expectState(State.WRITING);
946 return userDataStream;
947 }
948
949
950
951
952
953 void ensureBlockReady() throws IOException {
954 Preconditions.checkState(state != State.INIT,
955 "Unexpected state: " + state);
956
957 if (state == State.BLOCK_READY)
958 return;
959
960
961 finishBlock();
962 }
963
964
965
966
967
968
969
970 private void finishBlock() throws IOException {
971 if (blockType == BlockType.DATA) {
972 BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
973 new BufferGrabbingByteArrayOutputStream();
974 baosInMemory.writeTo(baosInMemoryCopy);
975 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
976 baosInMemoryCopy.buf, blockType);
977 blockType = dataBlockEncodingCtx.getBlockType();
978 }
979 userDataStream.flush();
980
981 uncompressedBytesWithHeader = baosInMemory.toByteArray();
982 prevOffset = prevOffsetByType[blockType.getId()];
983
984
985
986
987 state = State.BLOCK_READY;
988 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
989 onDiskBytesWithHeader = dataBlockEncodingCtx
990 .compressAndEncrypt(uncompressedBytesWithHeader);
991 } else {
992 onDiskBytesWithHeader = defaultBlockEncodingCtx
993 .compressAndEncrypt(uncompressedBytesWithHeader);
994 }
995 int numBytes = (int) ChecksumUtil.numBytes(
996 onDiskBytesWithHeader.length,
997 fileContext.getBytesPerChecksum());
998
999
1000 putHeader(onDiskBytesWithHeader, 0,
1001 onDiskBytesWithHeader.length + numBytes,
1002 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1003
1004 putHeader(uncompressedBytesWithHeader, 0,
1005 onDiskBytesWithHeader.length + numBytes,
1006 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1007
1008 onDiskChecksum = new byte[numBytes];
1009 ChecksumUtil.generateChecksums(
1010 onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1011 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1012 }
1013
1014 public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1015 private byte[] buf;
1016
1017 @Override
1018 public void write(byte[] b, int off, int len) {
1019 this.buf = b;
1020 }
1021
1022 public byte[] getBuffer() {
1023 return this.buf;
1024 }
1025 }
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035 private void putHeader(byte[] dest, int offset, int onDiskSize,
1036 int uncompressedSize, int onDiskDataSize) {
1037 offset = blockType.put(dest, offset);
1038 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1039 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1040 offset = Bytes.putLong(dest, offset, prevOffset);
1041 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1042 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1043 Bytes.putInt(dest, offset, onDiskDataSize);
1044 }
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1055 long offset = out.getPos();
1056 if (startOffset != -1 && offset != startOffset) {
1057 throw new IOException("A " + blockType + " block written to a "
1058 + "stream twice, first at offset " + startOffset + ", then at "
1059 + offset);
1060 }
1061 startOffset = offset;
1062
1063 finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1064 }
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1076 throws IOException {
1077 ensureBlockReady();
1078 out.write(onDiskBytesWithHeader);
1079 out.write(onDiskChecksum);
1080 }
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092 byte[] getHeaderAndDataForTest() throws IOException {
1093 ensureBlockReady();
1094
1095
1096 byte[] output =
1097 new byte[onDiskBytesWithHeader.length
1098 + onDiskChecksum.length];
1099 System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1100 onDiskBytesWithHeader.length);
1101 System.arraycopy(onDiskChecksum, 0, output,
1102 onDiskBytesWithHeader.length, onDiskChecksum.length);
1103 return output;
1104 }
1105
1106
1107
1108
1109 public void release() {
1110 if (dataBlockEncodingCtx != null) {
1111 dataBlockEncodingCtx.close();
1112 dataBlockEncodingCtx = null;
1113 }
1114 if (defaultBlockEncodingCtx != null) {
1115 defaultBlockEncodingCtx.close();
1116 defaultBlockEncodingCtx = null;
1117 }
1118 }
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128 int getOnDiskSizeWithoutHeader() {
1129 expectState(State.BLOCK_READY);
1130 return onDiskBytesWithHeader.length
1131 + onDiskChecksum.length
1132 - HConstants.HFILEBLOCK_HEADER_SIZE;
1133 }
1134
1135
1136
1137
1138
1139
1140
1141
1142 int getOnDiskSizeWithHeader() {
1143 expectState(State.BLOCK_READY);
1144 return onDiskBytesWithHeader.length + onDiskChecksum.length;
1145 }
1146
1147
1148
1149
1150 int getUncompressedSizeWithoutHeader() {
1151 expectState(State.BLOCK_READY);
1152 return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1153 }
1154
1155
1156
1157
1158 int getUncompressedSizeWithHeader() {
1159 expectState(State.BLOCK_READY);
1160 return uncompressedBytesWithHeader.length;
1161 }
1162
1163
1164 public boolean isWriting() {
1165 return state == State.WRITING;
1166 }
1167
1168
1169
1170
1171
1172
1173
1174
1175 public int blockSizeWritten() {
1176 if (state != State.WRITING) return 0;
1177 return this.unencodedDataSizeWritten;
1178 }
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 ByteBuffer getUncompressedBufferWithHeader() {
1189 expectState(State.BLOCK_READY);
1190 return ByteBuffer.wrap(uncompressedBytesWithHeader);
1191 }
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201 ByteBuffer getOnDiskBufferWithHeader() {
1202 expectState(State.BLOCK_READY);
1203 return ByteBuffer.wrap(onDiskBytesWithHeader);
1204 }
1205
1206 private void expectState(State expectedState) {
1207 if (state != expectedState) {
1208 throw new IllegalStateException("Expected state: " + expectedState +
1209 ", actual state: " + state);
1210 }
1211 }
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1224 throws IOException {
1225 bw.writeToBlock(startWriting(bw.getBlockType()));
1226 writeHeaderAndData(out);
1227 }
1228
1229
1230
1231
1232
1233
1234
1235
1236 public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1237 HFileContext newContext = new HFileContextBuilder()
1238 .withBlockSize(fileContext.getBlocksize())
1239 .withBytesPerCheckSum(0)
1240 .withChecksumType(ChecksumType.NULL)
1241 .withCompression(fileContext.getCompression())
1242 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1243 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1244 .withCompressTags(fileContext.isCompressTags())
1245 .withIncludesMvcc(fileContext.isIncludesMvcc())
1246 .withIncludesTags(fileContext.isIncludesTags())
1247 .build();
1248 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1249 getUncompressedSizeWithoutHeader(), prevOffset,
1250 cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1251 getOnDiskBufferWithHeader() :
1252 getUncompressedBufferWithHeader(),
1253 FILL_HEADER, startOffset,
1254 onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1255 }
1256 }
1257
1258
1259 public interface BlockWritable {
1260
1261
1262 BlockType getBlockType();
1263
1264
1265
1266
1267
1268
1269
1270 void writeToBlock(DataOutput out) throws IOException;
1271 }
1272
1273
1274
1275
1276 public interface BlockIterator {
1277
1278
1279
1280
1281 HFileBlock nextBlock() throws IOException;
1282
1283
1284
1285
1286
1287 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1288 }
1289
1290
1291 public interface FSReader {
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304 HFileBlock readBlockData(long offset, long onDiskSize,
1305 int uncompressedSize, boolean pread) throws IOException;
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316 BlockIterator blockRange(long startOffset, long endOffset);
1317
1318
1319 void closeStreams() throws IOException;
1320
1321
1322 HFileBlockDecodingContext getBlockDecodingContext();
1323
1324
1325 HFileBlockDecodingContext getDefaultBlockDecodingContext();
1326
1327 void setIncludesMemstoreTS(boolean includesMemstoreTS);
1328 void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1329 }
1330
1331
1332
1333
1334
1335 private static class PrefetchedHeader {
1336 long offset = -1;
1337 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1338 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1339 }
1340
1341
1342 static class FSReaderImpl implements FSReader {
1343
1344
1345 protected FSDataInputStreamWrapper streamWrapper;
1346
1347 private HFileBlockDecodingContext encodedBlockDecodingCtx;
1348
1349
1350 private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1351
1352 private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1353 new ThreadLocal<PrefetchedHeader>() {
1354 @Override
1355 public PrefetchedHeader initialValue() {
1356 return new PrefetchedHeader();
1357 }
1358 };
1359
1360
1361
1362
1363 protected long fileSize;
1364
1365
1366 protected final int hdrSize;
1367
1368
1369 protected HFileSystem hfs;
1370
1371
1372 protected Path path;
1373
1374 private final Lock streamLock = new ReentrantLock();
1375
1376
1377 public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1378
1379 protected HFileContext fileContext;
1380
1381 public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1382 HFileContext fileContext) throws IOException {
1383 this.fileSize = fileSize;
1384 this.hfs = hfs;
1385 this.path = path;
1386 this.fileContext = fileContext;
1387 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1388
1389 this.streamWrapper = stream;
1390
1391 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1392 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1393 encodedBlockDecodingCtx = defaultDecodingCtx;
1394 }
1395
1396
1397
1398
1399
1400 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1401 throws IOException {
1402 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1403 }
1404
1405 public BlockIterator blockRange(final long startOffset, final long endOffset) {
1406 final FSReader owner = this;
1407 return new BlockIterator() {
1408 private long offset = startOffset;
1409
1410 @Override
1411 public HFileBlock nextBlock() throws IOException {
1412 if (offset >= endOffset)
1413 return null;
1414 HFileBlock b = readBlockData(offset, -1, -1, false);
1415 offset += b.getOnDiskSizeWithHeader();
1416 return b.unpack(fileContext, owner);
1417 }
1418
1419 @Override
1420 public HFileBlock nextBlockWithBlockType(BlockType blockType)
1421 throws IOException {
1422 HFileBlock blk = nextBlock();
1423 if (blk.getBlockType() != blockType) {
1424 throw new IOException("Expected block of type " + blockType
1425 + " but found " + blk.getBlockType());
1426 }
1427 return blk;
1428 }
1429 };
1430 }
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447 protected int readAtOffset(FSDataInputStream istream,
1448 byte[] dest, int destOffset, int size,
1449 boolean peekIntoNextBlock, long fileOffset, boolean pread)
1450 throws IOException {
1451 if (peekIntoNextBlock &&
1452 destOffset + size + hdrSize > dest.length) {
1453
1454
1455 throw new IOException("Attempted to read " + size + " bytes and " +
1456 hdrSize + " bytes of next header into a " + dest.length +
1457 "-byte array at offset " + destOffset);
1458 }
1459
1460 if (!pread && streamLock.tryLock()) {
1461
1462 try {
1463 istream.seek(fileOffset);
1464
1465 long realOffset = istream.getPos();
1466 if (realOffset != fileOffset) {
1467 throw new IOException("Tried to seek to " + fileOffset + " to "
1468 + "read " + size + " bytes, but pos=" + realOffset
1469 + " after seek");
1470 }
1471
1472 if (!peekIntoNextBlock) {
1473 IOUtils.readFully(istream, dest, destOffset, size);
1474 return -1;
1475 }
1476
1477
1478 if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1479 return -1;
1480 } finally {
1481 streamLock.unlock();
1482 }
1483 } else {
1484
1485 int extraSize = peekIntoNextBlock ? hdrSize : 0;
1486 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1487 size, extraSize)) {
1488 return -1;
1489 }
1490 }
1491
1492 assert peekIntoNextBlock;
1493 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1494 }
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507 @Override
1508 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1509 int uncompressedSize, boolean pread)
1510 throws IOException {
1511
1512
1513
1514
1515
1516
1517 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1518 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1519
1520 HFileBlock blk = readBlockDataInternal(is, offset,
1521 onDiskSizeWithHeaderL,
1522 uncompressedSize, pread,
1523 doVerificationThruHBaseChecksum);
1524 if (blk == null) {
1525 HFile.LOG.warn("HBase checksum verification failed for file " +
1526 path + " at offset " +
1527 offset + " filesize " + fileSize +
1528 ". Retrying read with HDFS checksums turned on...");
1529
1530 if (!doVerificationThruHBaseChecksum) {
1531 String msg = "HBase checksum verification failed for file " +
1532 path + " at offset " +
1533 offset + " filesize " + fileSize +
1534 " but this cannot happen because doVerify is " +
1535 doVerificationThruHBaseChecksum;
1536 HFile.LOG.warn(msg);
1537 throw new IOException(msg);
1538 }
1539 HFile.checksumFailures.incrementAndGet();
1540
1541
1542
1543
1544
1545
1546
1547 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1548 doVerificationThruHBaseChecksum = false;
1549 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1550 uncompressedSize, pread,
1551 doVerificationThruHBaseChecksum);
1552 if (blk != null) {
1553 HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1554 path + " at offset " +
1555 offset + " filesize " + fileSize);
1556 }
1557 }
1558 if (blk == null && !doVerificationThruHBaseChecksum) {
1559 String msg = "readBlockData failed, possibly due to " +
1560 "checksum verification failed for file " + path +
1561 " at offset " + offset + " filesize " + fileSize;
1562 HFile.LOG.warn(msg);
1563 throw new IOException(msg);
1564 }
1565
1566
1567
1568
1569
1570
1571
1572
1573 streamWrapper.checksumOk();
1574 return blk;
1575 }
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590 private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1591 long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1592 boolean verifyChecksum)
1593 throws IOException {
1594 if (offset < 0) {
1595 throw new IOException("Invalid offset=" + offset + " trying to read "
1596 + "block (onDiskSize=" + onDiskSizeWithHeaderL
1597 + ", uncompressedSize=" + uncompressedSize + ")");
1598 }
1599
1600 if (uncompressedSize != -1) {
1601 throw new IOException("Version 2 block reader API does not need " +
1602 "the uncompressed size parameter");
1603 }
1604
1605 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1606 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1607 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1608 + ": expected to be at least " + hdrSize
1609 + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1610 + offset + ", uncompressedSize=" + uncompressedSize + ")");
1611 }
1612
1613 int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1614
1615
1616
1617
1618
1619
1620
1621 PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1622 ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1623
1624
1625 int nextBlockOnDiskSize = 0;
1626 byte[] onDiskBlock = null;
1627
1628 HFileBlock b = null;
1629 if (onDiskSizeWithHeader > 0) {
1630
1631
1632
1633
1634
1635
1636
1637
1638 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1639 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1640
1641 nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1642 preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1643 true, offset + preReadHeaderSize, pread);
1644 if (headerBuf != null) {
1645
1646
1647
1648 assert headerBuf.hasArray();
1649 System.arraycopy(headerBuf.array(),
1650 headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1651 } else {
1652 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1653 }
1654
1655 try {
1656
1657 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1658 } catch (IOException ex) {
1659
1660 throw new IOException("Failed to read compressed block at "
1661 + offset
1662 + ", onDiskSizeWithoutHeader="
1663 + onDiskSizeWithHeader
1664 + ", preReadHeaderSize="
1665 + hdrSize
1666 + ", header.length="
1667 + prefetchedHeader.header.length
1668 + ", header bytes: "
1669 + Bytes.toStringBinary(prefetchedHeader.header, 0,
1670 hdrSize), ex);
1671 }
1672
1673 int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1674 assert onDiskSizeWithoutHeader >= 0;
1675 b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1676 } else {
1677
1678
1679
1680
1681
1682
1683
1684
1685 if (headerBuf == null) {
1686
1687
1688
1689
1690
1691 headerBuf = ByteBuffer.allocate(hdrSize);
1692
1693 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1694 hdrSize, false, offset, pread);
1695 }
1696
1697 b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1698 onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1699
1700 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1701 nextBlockOnDiskSize =
1702 readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1703 - hdrSize, true, offset + hdrSize, pread);
1704 onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1705 }
1706
1707 if (!fileContext.isCompressedOrEncrypted()) {
1708 b.assumeUncompressed();
1709 }
1710
1711 if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1712 return null;
1713 }
1714
1715
1716
1717
1718
1719 b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1720 this.fileContext.isUseHBaseChecksum());
1721
1722 b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1723
1724
1725 if (b.hasNextBlockHeader()) {
1726 prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1727 System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1728 }
1729
1730 b.offset = offset;
1731 b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1732 b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1733 return b;
1734 }
1735
1736 public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1737 this.fileContext.setIncludesMvcc(includesMemstoreTS);
1738 }
1739
1740 public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1741 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1742 }
1743
1744 @Override
1745 public HFileBlockDecodingContext getBlockDecodingContext() {
1746 return this.encodedBlockDecodingCtx;
1747 }
1748
1749 @Override
1750 public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1751 return this.defaultDecodingCtx;
1752 }
1753
1754
1755
1756
1757
1758
1759
1760 protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize)
1761 throws IOException {
1762 return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
1763 }
1764
1765 @Override
1766 public void closeStreams() throws IOException {
1767 streamWrapper.close();
1768 }
1769
1770 @Override
1771 public String toString() {
1772 return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1773 }
1774 }
1775
1776 @Override
1777 public int getSerializedLength() {
1778 if (buf != null) {
1779
1780 int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1781 return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1782 }
1783 return 0;
1784 }
1785
1786 @Override
1787 public void serialize(ByteBuffer destination) {
1788 this.buf.get(destination, 0, getSerializedLength()
1789 - EXTRA_SERIALIZATION_SPACE);
1790 serializeExtraInfo(destination);
1791 }
1792
1793 public void serializeExtraInfo(ByteBuffer destination) {
1794 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1795 destination.putLong(this.offset);
1796 destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1797 destination.rewind();
1798 }
1799
1800 @Override
1801 public CacheableDeserializer<Cacheable> getDeserializer() {
1802 return HFileBlock.blockDeserializer;
1803 }
1804
1805 @Override
1806 public int hashCode() {
1807 int result = 1;
1808 result = result * 31 + blockType.hashCode();
1809 result = result * 31 + nextBlockOnDiskSizeWithHeader;
1810 result = result * 31 + (int) (offset ^ (offset >>> 32));
1811 result = result * 31 + onDiskSizeWithoutHeader;
1812 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1813 result = result * 31 + uncompressedSizeWithoutHeader;
1814 result = result * 31 + buf.hashCode();
1815 return result;
1816 }
1817
1818 @Override
1819 public boolean equals(Object comparison) {
1820 if (this == comparison) {
1821 return true;
1822 }
1823 if (comparison == null) {
1824 return false;
1825 }
1826 if (comparison.getClass() != this.getClass()) {
1827 return false;
1828 }
1829
1830 HFileBlock castedComparison = (HFileBlock) comparison;
1831
1832 if (castedComparison.blockType != this.blockType) {
1833 return false;
1834 }
1835 if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1836 return false;
1837 }
1838 if (castedComparison.offset != this.offset) {
1839 return false;
1840 }
1841 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1842 return false;
1843 }
1844 if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1845 return false;
1846 }
1847 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1848 return false;
1849 }
1850 if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1851 castedComparison.buf.limit()) != 0) {
1852 return false;
1853 }
1854 return true;
1855 }
1856
1857 public DataBlockEncoding getDataBlockEncoding() {
1858 if (blockType == BlockType.ENCODED_DATA) {
1859 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1860 }
1861 return DataBlockEncoding.NONE;
1862 }
1863
1864 byte getChecksumType() {
1865 return this.fileContext.getChecksumType().getCode();
1866 }
1867
1868 int getBytesPerChecksum() {
1869 return this.fileContext.getBytesPerChecksum();
1870 }
1871
1872
1873 int getOnDiskDataSizeWithHeader() {
1874 return this.onDiskDataSizeWithHeader;
1875 }
1876
1877
1878
1879
1880
1881 int totalChecksumBytes() {
1882
1883
1884
1885
1886 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1887 return 0;
1888 }
1889 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1890 this.fileContext.getBytesPerChecksum());
1891 }
1892
1893
1894
1895
1896 public int headerSize() {
1897 return headerSize(this.fileContext.isUseHBaseChecksum());
1898 }
1899
1900
1901
1902
1903 public static int headerSize(boolean usesHBaseChecksum) {
1904 if (usesHBaseChecksum) {
1905 return HConstants.HFILEBLOCK_HEADER_SIZE;
1906 }
1907 return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1908 }
1909
1910
1911
1912
1913 public byte[] getDummyHeaderForVersion() {
1914 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1915 }
1916
1917
1918
1919
1920 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1921 if (usesHBaseChecksum) {
1922 return HConstants.HFILEBLOCK_DUMMY_HEADER;
1923 }
1924 return DUMMY_HEADER_NO_CHECKSUM;
1925 }
1926
1927
1928
1929
1930
1931 public HFileContext getHFileContext() {
1932 return this.fileContext;
1933 }
1934
1935 @Override
1936 public MemoryType getMemoryType() {
1937 return this.memType;
1938 }
1939
1940
1941
1942
1943 public boolean usesSharedMemory() {
1944 return this.memType == MemoryType.SHARED;
1945 }
1946
1947
1948
1949
1950
1951
1952 static String toStringHeader(ByteBuff buf) throws IOException {
1953 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1954 buf.get(magicBuf);
1955 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1956 int compressedBlockSizeNoHeader = buf.getInt();
1957 int uncompressedBlockSizeNoHeader = buf.getInt();
1958 long prevBlockOffset = buf.getLong();
1959 byte cksumtype = buf.get();
1960 long bytesPerChecksum = buf.getInt();
1961 long onDiskDataSizeWithHeader = buf.getInt();
1962 return " Header dump: magic: " + Bytes.toString(magicBuf) +
1963 " blockType " + bt +
1964 " compressedBlockSizeNoHeader " +
1965 compressedBlockSizeNoHeader +
1966 " uncompressedBlockSizeNoHeader " +
1967 uncompressedBlockSizeNoHeader +
1968 " prevBlockOffset " + prevBlockOffset +
1969 " checksumType " + ChecksumType.codeToType(cksumtype) +
1970 " bytesPerChecksum " + bytesPerChecksum +
1971 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1972 }
1973 }