View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellComparator;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
43  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.io.HeapSize;
46  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
47  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
48  import org.apache.hadoop.hbase.nio.ByteBuff;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.hbase.util.ObjectIntPair;
52  import org.apache.hadoop.io.WritableUtils;
53  import org.apache.hadoop.util.StringUtils;
54  
55  /**
56   * Provides functionality to write ({@link BlockIndexWriter}) and read
57   * BlockIndexReader
58   * single-level and multi-level block indexes.
59   *
60   * Examples of how to use the block index writer can be found in
61   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
62   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
63   *  found in {@link HFileWriterImpl} and
64   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
65   */
66  @InterfaceAudience.Private
67  public class HFileBlockIndex {
68  
69    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
70  
71    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
72  
73    /**
74     * The maximum size guideline for index blocks (both leaf, intermediate, and
75     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
76     */
77    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
78  
79    /**
80     * The number of bytes stored in each "secondary index" entry in addition to
81     * key bytes in the non-root index block format. The first long is the file
82     * offset of the deeper-level block the entry points to, and the int that
83     * follows is that block's on-disk size without including header.
84     */
85    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
86        + Bytes.SIZEOF_LONG;
87  
88    /**
89     * Error message when trying to use inline block API in single-level mode.
90     */
91    private static final String INLINE_BLOCKS_NOT_ALLOWED =
92        "Inline blocks are not allowed in the single-level-only mode";
93  
94    /**
95     * The size of a meta-data record used for finding the mid-key in a
96     * multi-level index. Consists of the middle leaf-level index block offset
97     * (long), its on-disk size without header included (int), and the mid-key
98     * entry's zero-based index in that leaf index block.
99     */
100   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
101       2 * Bytes.SIZEOF_INT;
102 
103   /**
104    * An implementation of the BlockIndexReader that deals with block keys which are plain
105    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
106    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
107    */
108    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
109 
110     private byte[][] blockKeys;
111 
112     public ByteArrayKeyBlockIndexReader(final int treeLevel,
113         final CachingBlockReader cachingBlockReader) {
114       this(treeLevel);
115       this.cachingBlockReader = cachingBlockReader;
116     }
117 
118     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
119       // Can be null for METAINDEX block
120       searchTreeLevel = treeLevel;
121     }
122 
123     protected long calculateHeapSizeForBlockKeys(long heapSize) {
124       // Calculating the size of blockKeys
125       if (blockKeys != null) {
126         heapSize += ClassSize.REFERENCE;
127         // Adding array + references overhead
128         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
129 
130         // Adding bytes
131         for (byte[] key : blockKeys) {
132           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
133         }
134       }
135       return heapSize;
136     }
137 
138     @Override
139     public boolean isEmpty() {
140       return blockKeys.length == 0;
141     }
142 
143     /**
144      * @param i
145      *          from 0 to {@link #getRootBlockCount() - 1}
146      */
147     public byte[] getRootBlockKey(int i) {
148       return blockKeys[i];
149     }
150 
151     @Override
152     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
153         boolean cacheBlocks, boolean pread, boolean isCompaction,
154         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
155       // this would not be needed
156       return null;
157     }
158 
159     @Override
160     public Cell midkey() throws IOException {
161       // Not needed here
162       return null;
163     }
164 
165     protected void initialize(int numEntries) {
166       blockKeys = new byte[numEntries][];
167     }
168 
169     protected void add(final byte[] key, final long offset, final int dataSize) {
170       blockOffsets[rootCount] = offset;
171       blockKeys[rootCount] = key;
172       blockDataSizes[rootCount] = dataSize;
173       rootCount++;
174     }
175 
176     @Override
177     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
178       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
179       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
180       // binarySearch's javadoc.
181 
182       if (pos >= 0) {
183         // This means this is an exact match with an element of blockKeys.
184         assert pos < blockKeys.length;
185         return pos;
186       }
187 
188       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
189       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
190       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
191       // key < blockKeys[0], meaning the file does not contain the given key.
192 
193       int i = -pos - 1;
194       assert 0 <= i && i <= blockKeys.length;
195       return i - 1;
196     }
197 
198     @Override
199     public int rootBlockContainingKey(Cell key) {
200       // Should not be called on this because here it deals only with byte[]
201       throw new UnsupportedOperationException(
202           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
203           "can be searched for");
204     }
205 
206     @Override
207     public String toString() {
208       StringBuilder sb = new StringBuilder();
209       sb.append("size=" + rootCount).append("\n");
210       for (int i = 0; i < rootCount; i++) {
211         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
212             .append("\n  offset=").append(blockOffsets[i])
213             .append(", dataSize=" + blockDataSizes[i]).append("\n");
214       }
215       return sb.toString();
216     }
217 
218   }
219 
220   /**
221    * An implementation of the BlockIndexReader that deals with block keys which are the key
222    * part of a cell like the Data block index or the ROW_COL bloom blocks
223    * This needs a comparator to work with the Cells
224    */
225    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
226 
227     private Cell[] blockKeys;
228     /** Pre-computed mid-key */
229     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
230     /** Needed doing lookup on blocks. */
231     private CellComparator comparator;
232 
233     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
234         final CachingBlockReader cachingBlockReader) {
235       this(c, treeLevel);
236       this.cachingBlockReader = cachingBlockReader;
237     }
238 
239     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
240       // Can be null for METAINDEX block
241       comparator = c;
242       searchTreeLevel = treeLevel;
243     }
244 
245     protected long calculateHeapSizeForBlockKeys(long heapSize) {
246       if (blockKeys != null) {
247         heapSize += ClassSize.REFERENCE;
248         // Adding array + references overhead
249         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
250 
251         // Adding blockKeys
252         for (Cell key : blockKeys) {
253           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
254         }
255       }
256       // Add comparator and the midkey atomicreference
257       heapSize += 2 * ClassSize.REFERENCE;
258       return heapSize;
259     }
260 
261     @Override
262     public boolean isEmpty() {
263       return blockKeys.length == 0;
264     }
265 
266     /**
267      * @param i
268      *          from 0 to {@link #getRootBlockCount() - 1}
269      */
270     public Cell getRootBlockKey(int i) {
271       return blockKeys[i];
272     }
273     @Override
274     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
275         boolean cacheBlocks, boolean pread, boolean isCompaction,
276         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
277       int rootLevelIndex = rootBlockContainingKey(key);
278       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
279         return null;
280       }
281 
282       // the next indexed key
283       Cell nextIndexedKey = null;
284 
285       // Read the next-level (intermediate or leaf) index block.
286       long currentOffset = blockOffsets[rootLevelIndex];
287       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
288 
289       if (rootLevelIndex < blockKeys.length - 1) {
290         nextIndexedKey = blockKeys[rootLevelIndex + 1];
291       } else {
292         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
293       }
294 
295       int lookupLevel = 1; // How many levels deep we are in our lookup.
296       int index = -1;
297 
298       HFileBlock block = null;
299       boolean dataBlock = false;
300       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
301       while (true) {
302         try {
303           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
304             // Avoid reading the same block again, even with caching turned off.
305             // This is crucial for compaction-type workload which might have
306             // caching turned off. This is like a one-block cache inside the
307             // scanner.
308             block = currentBlock;
309           } else {
310             // Call HFile's caching block reader API. We always cache index
311             // blocks, otherwise we might get terrible performance.
312             boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
313             BlockType expectedBlockType;
314             if (lookupLevel < searchTreeLevel - 1) {
315               expectedBlockType = BlockType.INTERMEDIATE_INDEX;
316             } else if (lookupLevel == searchTreeLevel - 1) {
317               expectedBlockType = BlockType.LEAF_INDEX;
318             } else {
319               // this also accounts for ENCODED_DATA
320               expectedBlockType = BlockType.DATA;
321             }
322             block =
323                 cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
324                   isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
325           }
326 
327           if (block == null) {
328             throw new IOException("Failed to read block at offset " + currentOffset
329                 + ", onDiskSize=" + currentOnDiskSize);
330           }
331 
332           // Found a data block, break the loop and check our level in the tree.
333           if (block.getBlockType().isData()) {
334             dataBlock = true;
335             break;
336           }
337 
338           // Not a data block. This must be a leaf-level or intermediate-level
339           // index block. We don't allow going deeper than searchTreeLevel.
340           if (++lookupLevel > searchTreeLevel) {
341             throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
342                 + ", searchTreeLevel=" + searchTreeLevel);
343           }
344 
345           // Locate the entry corresponding to the given key in the non-root
346           // (leaf or intermediate-level) index block.
347           ByteBuff buffer = block.getBufferWithoutHeader();
348           index = locateNonRootIndexEntry(buffer, key, comparator);
349           if (index == -1) {
350             // This has to be changed
351             // For now change this to key value
352             throw new IOException("The key "
353                 + CellUtil.getCellKeyAsString(key)
354                 + " is before the" + " first key of the non-root index block " + block);
355           }
356 
357           currentOffset = buffer.getLong();
358           currentOnDiskSize = buffer.getInt();
359 
360           // Only update next indexed key if there is a next indexed key in the current level
361           byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
362           if (nonRootIndexedKey != null) {
363             tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
364             nextIndexedKey = tmpNextIndexKV;
365           }
366         } finally {
367           if (!dataBlock) {
368             // Return the block immediately if it is not the
369             // data block
370             cachingBlockReader.returnBlock(block);
371           }
372         }
373       }
374 
375       if (lookupLevel != searchTreeLevel) {
376         assert dataBlock == true;
377         // Though we have retrieved a data block we have found an issue
378         // in the retrieved data block. Hence returned the block so that
379         // the ref count can be decremented
380         cachingBlockReader.returnBlock(block);
381         throw new IOException("Reached a data block at level " + lookupLevel +
382             " but the number of levels is " + searchTreeLevel);
383       }
384 
385       // set the next indexed key for the current block.
386       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
387       return blockWithScanInfo;
388     }
389 
390     @Override
391     public Cell midkey() throws IOException {
392       if (rootCount == 0)
393         throw new IOException("HFile empty");
394 
395       Cell targetMidKey = this.midKey.get();
396       if (targetMidKey != null) {
397         return targetMidKey;
398       }
399 
400       if (midLeafBlockOffset >= 0) {
401         if (cachingBlockReader == null) {
402           throw new IOException("Have to read the middle leaf block but " +
403               "no block reader available");
404         }
405 
406         // Caching, using pread, assuming this is not a compaction.
407         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
408             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
409             BlockType.LEAF_INDEX, null);
410         try {
411           ByteBuff b = midLeafBlock.getBufferWithoutHeader();
412           int numDataBlocks = b.getIntAfterPosition(0);
413           int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
414           int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
415           int keyOffset =
416               Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
417                   + SECONDARY_INDEX_ENTRY_OVERHEAD;
418           byte[] bytes = b.toBytes(keyOffset, keyLen);
419           targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
420         } finally {
421           cachingBlockReader.returnBlock(midLeafBlock);
422         }
423       } else {
424         // The middle of the root-level index.
425         targetMidKey = blockKeys[rootCount / 2];
426       }
427 
428       this.midKey.set(targetMidKey);
429       return targetMidKey;
430     }
431 
432     protected void initialize(int numEntries) {
433       blockKeys = new Cell[numEntries];
434     }
435 
436     /**
437      * Adds a new entry in the root block index. Only used when reading.
438      *
439      * @param key Last key in the block
440      * @param offset file offset where the block is stored
441      * @param dataSize the uncompressed data size
442      */
443     protected void add(final byte[] key, final long offset, final int dataSize) {
444       blockOffsets[rootCount] = offset;
445       // Create the blockKeys as Cells once when the reader is opened
446       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
447       blockDataSizes[rootCount] = dataSize;
448       rootCount++;
449     }
450 
451     @Override
452     public int rootBlockContainingKey(final byte[] key, int offset, int length,
453         CellComparator comp) {
454       // This should always be called with Cell not with a byte[] key
455       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
456       		"array. Only cell based keys can be searched for");
457     }
458 
459     @Override
460     public int rootBlockContainingKey(Cell key) {
461       // Here the comparator should not be null as this happens for the root-level block
462       int pos = Bytes.binarySearch(blockKeys, key, comparator);
463       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
464       // binarySearch's javadoc.
465 
466       if (pos >= 0) {
467         // This means this is an exact match with an element of blockKeys.
468         assert pos < blockKeys.length;
469         return pos;
470       }
471 
472       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
473       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
474       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
475       // key < blockKeys[0], meaning the file does not contain the given key.
476 
477       int i = -pos - 1;
478       assert 0 <= i && i <= blockKeys.length;
479       return i - 1;
480     }
481 
482     @Override
483     public String toString() {
484       StringBuilder sb = new StringBuilder();
485       sb.append("size=" + rootCount).append("\n");
486       for (int i = 0; i < rootCount; i++) {
487         sb.append("key=").append((blockKeys[i]))
488             .append("\n  offset=").append(blockOffsets[i])
489             .append(", dataSize=" + blockDataSizes[i]).append("\n");
490       }
491       return sb.toString();
492     }
493   }
494    /**
495    * The reader will always hold the root level index in the memory. Index
496    * blocks at all other levels will be cached in the LRU cache in practice,
497    * although this API does not enforce that.
498    *
499    * All non-root (leaf and intermediate) index blocks contain what we call a
500    * "secondary index": an array of offsets to the entries within the block.
501    * This allows us to do binary search for the entry corresponding to the
502    * given key without having to deserialize the block.
503    */
504    static abstract class BlockIndexReader implements HeapSize {
505 
506     protected long[] blockOffsets;
507     protected int[] blockDataSizes;
508     protected int rootCount = 0;
509 
510     // Mid-key metadata.
511     protected long midLeafBlockOffset = -1;
512     protected int midLeafBlockOnDiskSize = -1;
513     protected int midKeyEntry = -1;
514 
515     /**
516      * The number of levels in the block index tree. One if there is only root
517      * level, two for root and leaf levels, etc.
518      */
519     protected int searchTreeLevel;
520 
521     /** A way to read {@link HFile} blocks at a given offset */
522     protected CachingBlockReader cachingBlockReader;
523 
524     /**
525      * @return true if the block index is empty.
526      */
527     public abstract boolean isEmpty();
528 
529     /**
530      * Verifies that the block index is non-empty and throws an
531      * {@link IllegalStateException} otherwise.
532      */
533     public void ensureNonEmpty() {
534       if (isEmpty()) {
535         throw new IllegalStateException("Block index is empty or not loaded");
536       }
537     }
538 
539     /**
540      * Return the data block which contains this key. This function will only
541      * be called when the HFile version is larger than 1.
542      *
543      * @param key the key we are looking for
544      * @param currentBlock the current block, to avoid re-reading the same block
545      * @param cacheBlocks
546      * @param pread
547      * @param isCompaction
548      * @param expectedDataBlockEncoding the data block encoding the caller is
549      *          expecting the data block to be in, or null to not perform this
550      *          check and return the block irrespective of the encoding
551      * @return reader a basic way to load blocks
552      * @throws IOException
553      */
554     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
555         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
556         throws IOException {
557       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
558           cacheBlocks,
559           pread, isCompaction, expectedDataBlockEncoding);
560       if (blockWithScanInfo == null) {
561         return null;
562       } else {
563         return blockWithScanInfo.getHFileBlock();
564       }
565     }
566 
567     /**
568      * Return the BlockWithScanInfo which contains the DataBlock with other scan
569      * info such as nextIndexedKey. This function will only be called when the
570      * HFile version is larger than 1.
571      * 
572      * @param key
573      *          the key we are looking for
574      * @param currentBlock
575      *          the current block, to avoid re-reading the same block
576      * @param cacheBlocks
577      * @param pread
578      * @param isCompaction
579      * @param expectedDataBlockEncoding the data block encoding the caller is
580      *          expecting the data block to be in, or null to not perform this
581      *          check and return the block irrespective of the encoding.
582      * @return the BlockWithScanInfo which contains the DataBlock with other
583      *         scan info such as nextIndexedKey.
584      * @throws IOException
585      */
586     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
587         boolean cacheBlocks,
588         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
589         throws IOException;
590 
591     /**
592      * An approximation to the {@link HFile}'s mid-key. Operates on block
593      * boundaries, and does not go inside blocks. In other words, returns the
594      * first key of the middle block of the file.
595      *
596      * @return the first key of the middle block
597      */
598     public abstract Cell midkey() throws IOException;
599 
600     /**
601      * @param i from 0 to {@link #getRootBlockCount() - 1}
602      */
603     public long getRootBlockOffset(int i) {
604       return blockOffsets[i];
605     }
606 
607     /**
608      * @param i zero-based index of a root-level block
609      * @return the on-disk size of the root-level block for version 2, or the
610      *         uncompressed size for version 1
611      */
612     public int getRootBlockDataSize(int i) {
613       return blockDataSizes[i];
614     }
615 
616     /**
617      * @return the number of root-level blocks in this block index
618      */
619     public int getRootBlockCount() {
620       return rootCount;
621     }
622 
623     /**
624      * Finds the root-level index block containing the given key.
625      * 
626      * @param key
627      *          Key to find
628      * @param comp
629      *          the comparator to be used
630      * @return Offset of block containing <code>key</code> (between 0 and the
631      *         number of blocks - 1) or -1 if this file does not contain the
632      *         request.
633      */
634     // When we want to find the meta index block or bloom block for ROW bloom
635     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
636     // CellComparator.
637     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
638         CellComparator comp);
639 
640     /**
641      * Finds the root-level index block containing the given key.
642      *
643      * @param key
644      *          Key to find
645      * @return Offset of block containing <code>key</code> (between 0 and the
646      *         number of blocks - 1) or -1 if this file does not contain the
647      *         request.
648      */
649     // When we want to find the meta index block or bloom block for ROW bloom
650     // type
651     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
652     // need the CellComparator.
653     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
654       return rootBlockContainingKey(key, offset, length, null);
655     }
656 
657     /**
658      * Finds the root-level index block containing the given key.
659      *
660      * @param key
661      *          Key to find
662      */
663     public abstract int rootBlockContainingKey(final Cell key);
664 
665     /**
666      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
667      * @param nonRootIndex
668      * @param i the ith position
669      * @return The indexed key at the ith position in the nonRootIndex.
670      */
671     protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
672       int numEntries = nonRootIndex.getInt(0);
673       if (i < 0 || i >= numEntries) {
674         return null;
675       }
676 
677       // Entries start after the number of entries and the secondary index.
678       // The secondary index takes numEntries + 1 ints.
679       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
680       // Targetkey's offset relative to the end of secondary index
681       int targetKeyRelOffset = nonRootIndex.getInt(
682           Bytes.SIZEOF_INT * (i + 1));
683 
684       // The offset of the target key in the blockIndex buffer
685       int targetKeyOffset = entriesOffset     // Skip secondary index
686           + targetKeyRelOffset               // Skip all entries until mid
687           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
688 
689       // We subtract the two consecutive secondary index elements, which
690       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
691       // then need to subtract the overhead of offset and onDiskSize.
692       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
693         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
694 
695       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
696       return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
697     }
698 
699     /**
700      * Performs a binary search over a non-root level index block. Utilizes the
701      * secondary index, which records the offsets of (offset, onDiskSize,
702      * firstKey) tuples of all entries.
703      * 
704      * @param key
705      *          the key we are searching for offsets to individual entries in
706      *          the blockIndex buffer
707      * @param nonRootIndex
708      *          the non-root index block buffer, starting with the secondary
709      *          index. The position is ignored.
710      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
711      *         keys[i + 1], if keys is the array of all keys being searched, or
712      *         -1 otherwise
713      * @throws IOException
714      */
715     static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
716         CellComparator comparator) {
717 
718       int numEntries = nonRootIndex.getIntAfterPosition(0);
719       int low = 0;
720       int high = numEntries - 1;
721       int mid = 0;
722 
723       // Entries start after the number of entries and the secondary index.
724       // The secondary index takes numEntries + 1 ints.
725       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
726 
727       // If we imagine that keys[-1] = -Infinity and
728       // keys[numEntries] = Infinity, then we are maintaining an invariant that
729       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
730       ByteBufferedKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferedKeyOnlyKeyValue();
731       ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<ByteBuffer>();
732       while (low <= high) {
733         mid = (low + high) >>> 1;
734 
735         // Midkey's offset relative to the end of secondary index
736         int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
737 
738         // The offset of the middle key in the blockIndex buffer
739         int midKeyOffset = entriesOffset       // Skip secondary index
740             + midKeyRelOffset                  // Skip all entries until mid
741             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
742 
743         // We subtract the two consecutive secondary index elements, which
744         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
745         // then need to subtract the overhead of offset and onDiskSize.
746         int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
747             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
748 
749         // we have to compare in this order, because the comparator order
750         // has special logic when the 'left side' is a special key.
751         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
752         // done after HBASE-12224 & HBASE-12282
753         // TODO avoid array call.
754         nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
755         nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
756         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexkeyOnlyKV);
757 
758         // key lives above the midpoint
759         if (cmp > 0)
760           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
761         // key lives below the midpoint
762         else if (cmp < 0)
763           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
764         else
765           return mid; // exact match
766       }
767 
768       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
769       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
770       // condition, low >= high + 1. Therefore, low = high + 1.
771 
772       if (low != high + 1) {
773         throw new IllegalStateException("Binary search broken: low=" + low
774             + " " + "instead of " + (high + 1));
775       }
776 
777       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
778       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
779       int i = low - 1;
780 
781       // Some extra validation on the result.
782       if (i < -1 || i >= numEntries) {
783         throw new IllegalStateException("Binary search broken: result is " +
784             i + " but expected to be between -1 and (numEntries - 1) = " +
785             (numEntries - 1));
786       }
787 
788       return i;
789     }
790 
791     /**
792      * Search for one key using the secondary index in a non-root block. In case
793      * of success, positions the provided buffer at the entry of interest, where
794      * the file offset and the on-disk-size can be read.
795      *
796      * @param nonRootBlock
797      *          a non-root block without header. Initial position does not
798      *          matter.
799      * @param key
800      *          the byte array containing the key
801      * @return the index position where the given key was found, otherwise
802      *         return -1 in the case the given key is before the first key.
803      *
804      */
805     static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
806         CellComparator comparator) {
807       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
808 
809       if (entryIndex != -1) {
810         int numEntries = nonRootBlock.getIntAfterPosition(0);
811 
812         // The end of secondary index and the beginning of entries themselves.
813         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
814 
815         // The offset of the entry we are interested in relative to the end of
816         // the secondary index.
817         int entryRelOffset = nonRootBlock
818             .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
819 
820         nonRootBlock.position(entriesOffset + entryRelOffset);
821       }
822 
823       return entryIndex;
824     }
825 
826     /**
827      * Read in the root-level index from the given input stream. Must match
828      * what was written into the root level by
829      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
830      * offset that function returned.
831      *
832      * @param in the buffered input stream or wrapped byte input stream
833      * @param numEntries the number of root-level index entries
834      * @throws IOException
835      */
836     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
837       blockOffsets = new long[numEntries];
838       initialize(numEntries);
839       blockDataSizes = new int[numEntries];
840 
841       // If index size is zero, no index was written.
842       if (numEntries > 0) {
843         for (int i = 0; i < numEntries; ++i) {
844           long offset = in.readLong();
845           int dataSize = in.readInt();
846           byte[] key = Bytes.readByteArray(in);
847           add(key, offset, dataSize);
848         }
849       }
850     }
851 
852     protected abstract void initialize(int numEntries);
853 
854     protected abstract void add(final byte[] key, final long offset, final int dataSize);
855 
856     /**
857      * Read in the root-level index from the given input stream. Must match
858      * what was written into the root level by
859      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
860      * offset that function returned.
861      *
862      * @param blk the HFile block
863      * @param numEntries the number of root-level index entries
864      * @return the buffered input stream or wrapped byte input stream
865      * @throws IOException
866      */
867     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
868       DataInputStream in = blk.getByteStream();
869       readRootIndex(in, numEntries);
870       return in;
871     }
872 
873     /**
874      * Read the root-level metadata of a multi-level block index. Based on
875      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
876      * necessary to compute the mid-key in a multi-level index.
877      *
878      * @param blk the HFile block
879      * @param numEntries the number of root-level index entries
880      * @throws IOException
881      */
882     public void readMultiLevelIndexRoot(HFileBlock blk,
883         final int numEntries) throws IOException {
884       DataInputStream in = readRootIndex(blk, numEntries);
885       // after reading the root index the checksum bytes have to
886       // be subtracted to know if the mid key exists.
887       int checkSumBytes = blk.totalChecksumBytes();
888       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
889         // No mid-key metadata available.
890         return;
891       }
892       midLeafBlockOffset = in.readLong();
893       midLeafBlockOnDiskSize = in.readInt();
894       midKeyEntry = in.readInt();
895     }
896 
897     @Override
898     public long heapSize() {
899       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
900       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
901           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
902 
903       // Mid-key metadata.
904       heapSize += MID_KEY_METADATA_SIZE;
905 
906       heapSize = calculateHeapSizeForBlockKeys(heapSize);
907 
908       if (blockOffsets != null) {
909         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
910             * Bytes.SIZEOF_LONG);
911       }
912 
913       if (blockDataSizes != null) {
914         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
915             * Bytes.SIZEOF_INT);
916       }
917 
918       return ClassSize.align(heapSize);
919     }
920 
921     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
922   }
923 
924   /**
925    * Writes the block index into the output stream. Generate the tree from
926    * bottom up. The leaf level is written to disk as a sequence of inline
927    * blocks, if it is larger than a certain number of bytes. If the leaf level
928    * is not large enough, we write all entries to the root level instead.
929    *
930    * After all leaf blocks have been written, we end up with an index
931    * referencing the resulting leaf index blocks. If that index is larger than
932    * the allowed root index size, the writer will break it up into
933    * reasonable-size intermediate-level index block chunks write those chunks
934    * out, and create another index referencing those chunks. This will be
935    * repeated until the remaining index is small enough to become the root
936    * index. However, in most practical cases we will only have leaf-level
937    * blocks and the root index, or just the root index.
938    */
939   public static class BlockIndexWriter implements InlineBlockWriter {
940     /**
941      * While the index is being written, this represents the current block
942      * index referencing all leaf blocks, with one exception. If the file is
943      * being closed and there are not enough blocks to complete even a single
944      * leaf block, no leaf blocks get written and this contains the entire
945      * block index. After all levels of the index were written by
946      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
947      * root-level index.
948      */
949     private BlockIndexChunk rootChunk = new BlockIndexChunk();
950 
951     /**
952      * Current leaf-level chunk. New entries referencing data blocks get added
953      * to this chunk until it grows large enough to be written to disk.
954      */
955     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
956 
957     /**
958      * The number of block index levels. This is one if there is only root
959      * level (even empty), two if there a leaf level and root level, and is
960      * higher if there are intermediate levels. This is only final after
961      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
962      * initial value accounts for the root level, and will be increased to two
963      * as soon as we find out there is a leaf-level in
964      * {@link #blockWritten(long, int, int)}.
965      */
966     private int numLevels = 1;
967 
968     private HFileBlock.Writer blockWriter;
969     private byte[] firstKey = null;
970 
971     /**
972      * The total number of leaf-level entries, i.e. entries referenced by
973      * leaf-level blocks. For the data block index this is equal to the number
974      * of data blocks.
975      */
976     private long totalNumEntries;
977 
978     /** Total compressed size of all index blocks. */
979     private long totalBlockOnDiskSize;
980 
981     /** Total uncompressed size of all index blocks. */
982     private long totalBlockUncompressedSize;
983 
984     /** The maximum size guideline of all multi-level index blocks. */
985     private int maxChunkSize;
986 
987     /** Whether we require this block index to always be single-level. */
988     private boolean singleLevelOnly;
989 
990     /** CacheConfig, or null if cache-on-write is disabled */
991     private CacheConfig cacheConf;
992 
993     /** Name to use for computing cache keys */
994     private String nameForCaching;
995 
996     /** Creates a single-level block index writer */
997     public BlockIndexWriter() {
998       this(null, null, null);
999       singleLevelOnly = true;
1000     }
1001 
1002     /**
1003      * Creates a multi-level block index writer.
1004      *
1005      * @param blockWriter the block writer to use to write index blocks
1006      * @param cacheConf used to determine when and how a block should be cached-on-write.
1007      */
1008     public BlockIndexWriter(HFileBlock.Writer blockWriter,
1009         CacheConfig cacheConf, String nameForCaching) {
1010       if ((cacheConf == null) != (nameForCaching == null)) {
1011         throw new IllegalArgumentException("Block cache and file name for " +
1012             "caching must be both specified or both null");
1013       }
1014 
1015       this.blockWriter = blockWriter;
1016       this.cacheConf = cacheConf;
1017       this.nameForCaching = nameForCaching;
1018       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1019     }
1020 
1021     public void setMaxChunkSize(int maxChunkSize) {
1022       if (maxChunkSize <= 0) {
1023         throw new IllegalArgumentException("Invald maximum index block size");
1024       }
1025       this.maxChunkSize = maxChunkSize;
1026     }
1027 
1028     /**
1029      * Writes the root level and intermediate levels of the block index into
1030      * the output stream, generating the tree from bottom up. Assumes that the
1031      * leaf level has been inline-written to the disk if there is enough data
1032      * for more than one leaf block. We iterate by breaking the current level
1033      * of the block index, starting with the index of all leaf-level blocks,
1034      * into chunks small enough to be written to disk, and generate its parent
1035      * level, until we end up with a level small enough to become the root
1036      * level.
1037      *
1038      * If the leaf level is not large enough, there is no inline block index
1039      * anymore, so we only write that level of block index to disk as the root
1040      * level.
1041      *
1042      * @param out FSDataOutputStream
1043      * @return position at which we entered the root-level index.
1044      * @throws IOException
1045      */
1046     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1047       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1048         throw new IOException("Trying to write a multi-level block index, " +
1049             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1050             "last inline chunk.");
1051       }
1052 
1053       // We need to get mid-key metadata before we create intermediate
1054       // indexes and overwrite the root chunk.
1055       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1056           : null;
1057 
1058       if (curInlineChunk != null) {
1059         while (rootChunk.getRootSize() > maxChunkSize) {
1060           rootChunk = writeIntermediateLevel(out, rootChunk);
1061           numLevels += 1;
1062         }
1063       }
1064 
1065       // write the root level
1066       long rootLevelIndexPos = out.getPos();
1067 
1068       {
1069         DataOutput blockStream =
1070             blockWriter.startWriting(BlockType.ROOT_INDEX);
1071         rootChunk.writeRoot(blockStream);
1072         if (midKeyMetadata != null)
1073           blockStream.write(midKeyMetadata);
1074         blockWriter.writeHeaderAndData(out);
1075       }
1076 
1077       // Add root index block size
1078       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1079       totalBlockUncompressedSize +=
1080           blockWriter.getUncompressedSizeWithoutHeader();
1081 
1082       if (LOG.isTraceEnabled()) {
1083         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1084           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1085           + " root-level entries, " + totalNumEntries + " total entries, "
1086           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1087           " on-disk size, "
1088           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1089           " total uncompressed size.");
1090       }
1091       return rootLevelIndexPos;
1092     }
1093 
1094     /**
1095      * Writes the block index data as a single level only. Does not do any
1096      * block framing.
1097      *
1098      * @param out the buffered output stream to write the index to. Typically a
1099      *          stream writing into an {@link HFile} block.
1100      * @param description a short description of the index being written. Used
1101      *          in a log message.
1102      * @throws IOException
1103      */
1104     public void writeSingleLevelIndex(DataOutput out, String description)
1105         throws IOException {
1106       expectNumLevels(1);
1107 
1108       if (!singleLevelOnly)
1109         throw new IOException("Single-level mode is turned off");
1110 
1111       if (rootChunk.getNumEntries() > 0)
1112         throw new IOException("Root-level entries already added in " +
1113             "single-level mode");
1114 
1115       rootChunk = curInlineChunk;
1116       curInlineChunk = new BlockIndexChunk();
1117 
1118       if (LOG.isTraceEnabled()) {
1119         LOG.trace("Wrote a single-level " + description + " index with "
1120           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1121           + " bytes");
1122       }
1123       rootChunk.writeRoot(out);
1124     }
1125 
1126     /**
1127      * Split the current level of the block index into intermediate index
1128      * blocks of permitted size and write those blocks to disk. Return the next
1129      * level of the block index referencing those intermediate-level blocks.
1130      *
1131      * @param out
1132      * @param currentLevel the current level of the block index, such as the a
1133      *          chunk referencing all leaf-level index blocks
1134      * @return the parent level block index, which becomes the root index after
1135      *         a few (usually zero) iterations
1136      * @throws IOException
1137      */
1138     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1139         BlockIndexChunk currentLevel) throws IOException {
1140       // Entries referencing intermediate-level blocks we are about to create.
1141       BlockIndexChunk parent = new BlockIndexChunk();
1142 
1143       // The current intermediate-level block index chunk.
1144       BlockIndexChunk curChunk = new BlockIndexChunk();
1145 
1146       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1147         curChunk.add(currentLevel.getBlockKey(i),
1148             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1149 
1150         if (curChunk.getRootSize() >= maxChunkSize)
1151           writeIntermediateBlock(out, parent, curChunk);
1152       }
1153 
1154       if (curChunk.getNumEntries() > 0) {
1155         writeIntermediateBlock(out, parent, curChunk);
1156       }
1157 
1158       return parent;
1159     }
1160 
1161     private void writeIntermediateBlock(FSDataOutputStream out,
1162         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1163       long beginOffset = out.getPos();
1164       DataOutputStream dos = blockWriter.startWriting(
1165           BlockType.INTERMEDIATE_INDEX);
1166       curChunk.writeNonRoot(dos);
1167       byte[] curFirstKey = curChunk.getBlockKey(0);
1168       blockWriter.writeHeaderAndData(out);
1169 
1170       if (cacheConf != null) {
1171         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1172         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1173           beginOffset), blockForCaching);
1174       }
1175 
1176       // Add intermediate index block size
1177       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1178       totalBlockUncompressedSize +=
1179           blockWriter.getUncompressedSizeWithoutHeader();
1180 
1181       // OFFSET is the beginning offset the chunk of block index entries.
1182       // SIZE is the total byte size of the chunk of block index entries
1183       // + the secondary index size
1184       // FIRST_KEY is the first key in the chunk of block index
1185       // entries.
1186       parent.add(curFirstKey, beginOffset,
1187           blockWriter.getOnDiskSizeWithHeader());
1188 
1189       // clear current block index chunk
1190       curChunk.clear();
1191       curFirstKey = null;
1192     }
1193 
1194     /**
1195      * @return how many block index entries there are in the root level
1196      */
1197     public final int getNumRootEntries() {
1198       return rootChunk.getNumEntries();
1199     }
1200 
1201     /**
1202      * @return the number of levels in this block index.
1203      */
1204     public int getNumLevels() {
1205       return numLevels;
1206     }
1207 
1208     private void expectNumLevels(int expectedNumLevels) {
1209       if (numLevels != expectedNumLevels) {
1210         throw new IllegalStateException("Number of block index levels is "
1211             + numLevels + "but is expected to be " + expectedNumLevels);
1212       }
1213     }
1214 
1215     /**
1216      * Whether there is an inline block ready to be written. In general, we
1217      * write an leaf-level index block as an inline block as soon as its size
1218      * as serialized in the non-root format reaches a certain threshold.
1219      */
1220     @Override
1221     public boolean shouldWriteBlock(boolean closing) {
1222       if (singleLevelOnly) {
1223         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1224       }
1225 
1226       if (curInlineChunk == null) {
1227         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1228             "called with closing=true and then called again?");
1229       }
1230 
1231       if (curInlineChunk.getNumEntries() == 0) {
1232         return false;
1233       }
1234 
1235       // We do have some entries in the current inline chunk.
1236       if (closing) {
1237         if (rootChunk.getNumEntries() == 0) {
1238           // We did not add any leaf-level blocks yet. Instead of creating a
1239           // leaf level with one block, move these entries to the root level.
1240 
1241           expectNumLevels(1);
1242           rootChunk = curInlineChunk;
1243           curInlineChunk = null;  // Disallow adding any more index entries.
1244           return false;
1245         }
1246 
1247         return true;
1248       } else {
1249         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1250       }
1251     }
1252 
1253     /**
1254      * Write out the current inline index block. Inline blocks are non-root
1255      * blocks, so the non-root index format is used.
1256      *
1257      * @param out
1258      */
1259     @Override
1260     public void writeInlineBlock(DataOutput out) throws IOException {
1261       if (singleLevelOnly)
1262         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1263 
1264       // Write the inline block index to the output stream in the non-root
1265       // index block format.
1266       curInlineChunk.writeNonRoot(out);
1267 
1268       // Save the first key of the inline block so that we can add it to the
1269       // parent-level index.
1270       firstKey = curInlineChunk.getBlockKey(0);
1271 
1272       // Start a new inline index block
1273       curInlineChunk.clear();
1274     }
1275 
1276     /**
1277      * Called after an inline block has been written so that we can add an
1278      * entry referring to that block to the parent-level index.
1279      */
1280     @Override
1281     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1282       // Add leaf index block size
1283       totalBlockOnDiskSize += onDiskSize;
1284       totalBlockUncompressedSize += uncompressedSize;
1285 
1286       if (singleLevelOnly)
1287         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1288 
1289       if (firstKey == null) {
1290         throw new IllegalStateException("Trying to add second-level index " +
1291             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1292             "but the first key was not set in writeInlineBlock");
1293       }
1294 
1295       if (rootChunk.getNumEntries() == 0) {
1296         // We are writing the first leaf block, so increase index level.
1297         expectNumLevels(1);
1298         numLevels = 2;
1299       }
1300 
1301       // Add another entry to the second-level index. Include the number of
1302       // entries in all previous leaf-level chunks for mid-key calculation.
1303       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1304       firstKey = null;
1305     }
1306 
1307     @Override
1308     public BlockType getInlineBlockType() {
1309       return BlockType.LEAF_INDEX;
1310     }
1311 
1312     /**
1313      * Add one index entry to the current leaf-level block. When the leaf-level
1314      * block gets large enough, it will be flushed to disk as an inline block.
1315      *
1316      * @param firstKey the first key of the data block
1317      * @param blockOffset the offset of the data block
1318      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1319      *          format version 2), or the uncompressed size of the data block (
1320      *          {@link HFile} format version 1).
1321      */
1322     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1323       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1324       ++totalNumEntries;
1325     }
1326 
1327     /**
1328      * @throws IOException if we happened to write a multi-level index.
1329      */
1330     public void ensureSingleLevel() throws IOException {
1331       if (numLevels > 1) {
1332         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1333             rootChunk.getNumEntries() + " root-level entries, but " +
1334             "this is expected to be a single-level block index.");
1335       }
1336     }
1337 
1338     /**
1339      * @return true if we are using cache-on-write. This is configured by the
1340      *         caller of the constructor by either passing a valid block cache
1341      *         or null.
1342      */
1343     @Override
1344     public boolean getCacheOnWrite() {
1345       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1346     }
1347 
1348     /**
1349      * The total uncompressed size of the root index block, intermediate-level
1350      * index blocks, and leaf-level index blocks.
1351      *
1352      * @return the total uncompressed size of all index blocks
1353      */
1354     public long getTotalUncompressedSize() {
1355       return totalBlockUncompressedSize;
1356     }
1357 
1358   }
1359 
1360   /**
1361    * A single chunk of the block index in the process of writing. The data in
1362    * this chunk can become a leaf-level, intermediate-level, or root index
1363    * block.
1364    */
1365   static class BlockIndexChunk {
1366 
1367     /** First keys of the key range corresponding to each index entry. */
1368     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1369 
1370     /** Block offset in backing stream. */
1371     private final List<Long> blockOffsets = new ArrayList<Long>();
1372 
1373     /** On-disk data sizes of lower-level data or index blocks. */
1374     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1375 
1376     /**
1377      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1378      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1379      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1380      */
1381     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1382 
1383     /**
1384      * The offset of the next entry to be added, relative to the end of the
1385      * "secondary index" in the "non-root" format representation of this index
1386      * chunk. This is the next value to be added to the secondary index.
1387      */
1388     private int curTotalNonRootEntrySize = 0;
1389 
1390     /**
1391      * The accumulated size of this chunk if stored in the root index format.
1392      */
1393     private int curTotalRootSize = 0;
1394 
1395     /**
1396      * The "secondary index" used for binary search over variable-length
1397      * records in a "non-root" format block. These offsets are relative to the
1398      * end of this secondary index.
1399      */
1400     private final List<Integer> secondaryIndexOffsetMarks =
1401         new ArrayList<Integer>();
1402 
1403     /**
1404      * Adds a new entry to this block index chunk.
1405      *
1406      * @param firstKey the first key in the block pointed to by this entry
1407      * @param blockOffset the offset of the next-level block pointed to by this
1408      *          entry
1409      * @param onDiskDataSize the on-disk data of the block pointed to by this
1410      *          entry, including header size
1411      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1412      *          construction, this specifies the current total number of
1413      *          sub-entries in all leaf-level chunks, including the one
1414      *          corresponding to the second-level entry being added.
1415      */
1416     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1417         long curTotalNumSubEntries) {
1418       // Record the offset for the secondary index
1419       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1420       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1421           + firstKey.length;
1422 
1423       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1424           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1425 
1426       blockKeys.add(firstKey);
1427       blockOffsets.add(blockOffset);
1428       onDiskDataSizes.add(onDiskDataSize);
1429 
1430       if (curTotalNumSubEntries != -1) {
1431         numSubEntriesAt.add(curTotalNumSubEntries);
1432 
1433         // Make sure the parallel arrays are in sync.
1434         if (numSubEntriesAt.size() != blockKeys.size()) {
1435           throw new IllegalStateException("Only have key/value count " +
1436               "stats for " + numSubEntriesAt.size() + " block index " +
1437               "entries out of " + blockKeys.size());
1438         }
1439       }
1440     }
1441 
1442     /**
1443      * The same as {@link #add(byte[], long, int, long)} but does not take the
1444      * key/value into account. Used for single-level indexes.
1445      *
1446      * @see {@link #add(byte[], long, int, long)}
1447      */
1448     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1449       add(firstKey, blockOffset, onDiskDataSize, -1);
1450     }
1451 
1452     public void clear() {
1453       blockKeys.clear();
1454       blockOffsets.clear();
1455       onDiskDataSizes.clear();
1456       secondaryIndexOffsetMarks.clear();
1457       numSubEntriesAt.clear();
1458       curTotalNonRootEntrySize = 0;
1459       curTotalRootSize = 0;
1460     }
1461 
1462     /**
1463      * Finds the entry corresponding to the deeper-level index block containing
1464      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1465      * ordering of sub-entries.
1466      *
1467      * <p>
1468      * <i> Implementation note. </i> We are looking for i such that
1469      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1470      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1471      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1472      * sub-entries. i is by definition the insertion point of k in
1473      * numSubEntriesAt.
1474      *
1475      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1476      * @return the 0-based index of the entry corresponding to the given
1477      *         sub-entry
1478      */
1479     public int getEntryBySubEntry(long k) {
1480       // We define mid-key as the key corresponding to k'th sub-entry
1481       // (0-based).
1482 
1483       int i = Collections.binarySearch(numSubEntriesAt, k);
1484 
1485       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1486       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1487       // is in the (i + 1)'th chunk.
1488       if (i >= 0)
1489         return i + 1;
1490 
1491       // Inexact match. Return the insertion point.
1492       return -i - 1;
1493     }
1494 
1495     /**
1496      * Used when writing the root block index of a multi-level block index.
1497      * Serializes additional information allowing to efficiently identify the
1498      * mid-key.
1499      *
1500      * @return a few serialized fields for finding the mid-key
1501      * @throws IOException if could not create metadata for computing mid-key
1502      */
1503     public byte[] getMidKeyMetadata() throws IOException {
1504       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1505           MID_KEY_METADATA_SIZE);
1506       DataOutputStream baosDos = new DataOutputStream(baos);
1507       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1508       if (totalNumSubEntries == 0) {
1509         throw new IOException("No leaf-level entries, mid-key unavailable");
1510       }
1511       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1512       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1513 
1514       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1515       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1516 
1517       long numSubEntriesBefore = midKeyEntry > 0
1518           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1519       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1520       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1521       {
1522         throw new IOException("Could not identify mid-key index within the "
1523             + "leaf-level block containing mid-key: out of range ("
1524             + subEntryWithinEntry + ", numSubEntriesBefore="
1525             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1526             + ")");
1527       }
1528 
1529       baosDos.writeInt((int) subEntryWithinEntry);
1530 
1531       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1532         throw new IOException("Could not write mid-key metadata: size=" +
1533             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1534       }
1535 
1536       // Close just to be good citizens, although this has no effect.
1537       baos.close();
1538 
1539       return baos.toByteArray();
1540     }
1541 
1542     /**
1543      * Writes the block index chunk in the non-root index block format. This
1544      * format contains the number of entries, an index of integer offsets
1545      * for quick binary search on variable-length records, and tuples of
1546      * block offset, on-disk block size, and the first key for each entry.
1547      *
1548      * @param out
1549      * @throws IOException
1550      */
1551     void writeNonRoot(DataOutput out) throws IOException {
1552       // The number of entries in the block.
1553       out.writeInt(blockKeys.size());
1554 
1555       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1556         throw new IOException("Corrupted block index chunk writer: " +
1557             blockKeys.size() + " entries but " +
1558             secondaryIndexOffsetMarks.size() + " secondary index items");
1559       }
1560 
1561       // For each entry, write a "secondary index" of relative offsets to the
1562       // entries from the end of the secondary index. This works, because at
1563       // read time we read the number of entries and know where the secondary
1564       // index ends.
1565       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1566         out.writeInt(currentSecondaryIndex);
1567 
1568       // We include one other element in the secondary index to calculate the
1569       // size of each entry more easily by subtracting secondary index elements.
1570       out.writeInt(curTotalNonRootEntrySize);
1571 
1572       for (int i = 0; i < blockKeys.size(); ++i) {
1573         out.writeLong(blockOffsets.get(i));
1574         out.writeInt(onDiskDataSizes.get(i));
1575         out.write(blockKeys.get(i));
1576       }
1577     }
1578 
1579     /**
1580      * @return the size of this chunk if stored in the non-root index block
1581      *         format
1582      */
1583     int getNonRootSize() {
1584       return Bytes.SIZEOF_INT                          // Number of entries
1585           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1586           + curTotalNonRootEntrySize;                  // All entries
1587     }
1588 
1589     /**
1590      * Writes this chunk into the given output stream in the root block index
1591      * format. This format is similar to the {@link HFile} version 1 block
1592      * index format, except that we store on-disk size of the block instead of
1593      * its uncompressed size.
1594      *
1595      * @param out the data output stream to write the block index to. Typically
1596      *          a stream writing into an {@link HFile} block.
1597      * @throws IOException
1598      */
1599     void writeRoot(DataOutput out) throws IOException {
1600       for (int i = 0; i < blockKeys.size(); ++i) {
1601         out.writeLong(blockOffsets.get(i));
1602         out.writeInt(onDiskDataSizes.get(i));
1603         Bytes.writeByteArray(out, blockKeys.get(i));
1604       }
1605     }
1606 
1607     /**
1608      * @return the size of this chunk if stored in the root index block format
1609      */
1610     int getRootSize() {
1611       return curTotalRootSize;
1612     }
1613 
1614     /**
1615      * @return the number of entries in this block index chunk
1616      */
1617     public int getNumEntries() {
1618       return blockKeys.size();
1619     }
1620 
1621     public byte[] getBlockKey(int i) {
1622       return blockKeys.get(i);
1623     }
1624 
1625     public long getBlockOffset(int i) {
1626       return blockOffsets.get(i);
1627     }
1628 
1629     public int getOnDiskDataSize(int i) {
1630       return onDiskDataSizes.get(i);
1631     }
1632 
1633     public long getCumulativeNumKV(int i) {
1634       if (i < 0)
1635         return 0;
1636       return numSubEntriesAt.get(i);
1637     }
1638 
1639   }
1640 
1641   public static int getMaxChunkSize(Configuration conf) {
1642     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1643   }
1644 }