View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Comparator;
28  import java.util.Map;
29  import java.util.SortedSet;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.CellComparator;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
48  import org.apache.hadoop.hbase.io.hfile.BlockType;
49  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50  import org.apache.hadoop.hbase.io.hfile.HFile;
51  import org.apache.hadoop.hbase.io.hfile.HFileContext;
52  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
53  import org.apache.hadoop.hbase.nio.ByteBuff;
54  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
55  import org.apache.hadoop.hbase.util.BloomFilter;
56  import org.apache.hadoop.hbase.util.BloomFilterFactory;
57  import org.apache.hadoop.hbase.util.BloomFilterWriter;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Writables;
60  import org.apache.hadoop.io.WritableUtils;
61  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
62  
63  import com.google.common.base.Function;
64  import com.google.common.base.Preconditions;
65  import com.google.common.collect.ImmutableList;
66  import com.google.common.collect.Ordering;
67  
68  /**
69   * A Store data file.  Stores usually have one or more of these files.  They
70   * are produced by flushing the memstore to disk.  To
71   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
72   * and append data. Be sure to add any metadata before calling close on the
73   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
74   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
75   * passing filesystem and path.  To read, call {@link #createReader()}.
76   * <p>StoreFiles may also reference store files in another Store.
77   *
78   * The reason for this weird pattern where you use a different instance for the
79   * writer and a reader is that we write once but read a lot more.
80   */
81  @InterfaceAudience.LimitedPrivate("Coprocessor")
82  public class StoreFile {
83    private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
84  
85    // Keys for fileinfo values in HFile
86  
87    /** Max Sequence ID in FileInfo */
88    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
89  
90    /** Major compaction flag in FileInfo */
91    public static final byte[] MAJOR_COMPACTION_KEY =
92        Bytes.toBytes("MAJOR_COMPACTION_KEY");
93  
94    /** Minor compaction flag in FileInfo */
95    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
96        Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
97  
98    /** Bloom filter Type in FileInfo */
99    public static final byte[] BLOOM_FILTER_TYPE_KEY =
100       Bytes.toBytes("BLOOM_FILTER_TYPE");
101 
102   /** Delete Family Count in FileInfo */
103   public static final byte[] DELETE_FAMILY_COUNT =
104       Bytes.toBytes("DELETE_FAMILY_COUNT");
105 
106   /** Last Bloom filter key in FileInfo */
107   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
108 
109   /** Key for Timerange information in metadata*/
110   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
111 
112   /** Key for timestamp of earliest-put in metadata*/
113   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
114 
115   /** Key for the number of mob cells in metadata*/
116   public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
117 
118   private final StoreFileInfo fileInfo;
119   private final FileSystem fs;
120 
121   // Block cache configuration and reference.
122   private final CacheConfig cacheConf;
123 
124   // Keys for metadata stored in backing HFile.
125   // Set when we obtain a Reader.
126   private long sequenceid = -1;
127 
128   // max of the MemstoreTS in the KV's in this store
129   // Set when we obtain a Reader.
130   private long maxMemstoreTS = -1;
131 
132   // firstKey, lastkey and cellComparator will be set when openReader.
133   private Cell firstKey;
134 
135   private Cell lastKey;
136 
137   private Comparator comparator;
138 
139   public Cell getFirstKey() {
140     return firstKey;
141   }
142 
143   public Cell getLastKey() {
144     return lastKey;
145   }
146 
147   public Comparator getComparator() {
148     return comparator;
149   }
150 
151   public long getMaxMemstoreTS() {
152     return maxMemstoreTS;
153   }
154 
155   public void setMaxMemstoreTS(long maxMemstoreTS) {
156     this.maxMemstoreTS = maxMemstoreTS;
157   }
158 
159   // If true, this file was product of a major compaction.  Its then set
160   // whenever you get a Reader.
161   private AtomicBoolean majorCompaction = null;
162 
163   // If true, this file should not be included in minor compactions.
164   // It's set whenever you get a Reader.
165   private boolean excludeFromMinorCompaction = false;
166 
167   /** Meta key set when store file is a result of a bulk load */
168   public static final byte[] BULKLOAD_TASK_KEY =
169     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
170   public static final byte[] BULKLOAD_TIME_KEY =
171     Bytes.toBytes("BULKLOAD_TIMESTAMP");
172 
173   /**
174    * Map of the metadata entries in the corresponding HFile
175    */
176   private Map<byte[], byte[]> metadataMap;
177 
178   // StoreFile.Reader
179   private volatile Reader reader;
180 
181   /**
182    * Bloom filter type specified in column family configuration. Does not
183    * necessarily correspond to the Bloom filter type present in the HFile.
184    */
185   private final BloomType cfBloomType;
186 
187   /**
188    * Key for skipping resetting sequence id in metadata.
189    * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
190    * if this metadata is set as true, the reset is skipped.
191    */
192   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
193 
194   /**
195    * Constructor, loads a reader and it's indices, etc. May allocate a
196    * substantial amount of ram depending on the underlying files (10-20MB?).
197    *
198    * @param fs  The current file system to use.
199    * @param p  The path of the file.
200    * @param conf  The current configuration.
201    * @param cacheConf  The cache configuration and block cache reference.
202    * @param cfBloomType The bloom type to use for this store file as specified
203    *          by column family configuration. This may or may not be the same
204    *          as the Bloom filter type actually present in the HFile, because
205    *          column family configuration might change. If this is
206    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
207    * @throws IOException When opening the reader fails.
208    */
209   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
210         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
211     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
212   }
213 
214 
215   /**
216    * Constructor, loads a reader and it's indices, etc. May allocate a
217    * substantial amount of ram depending on the underlying files (10-20MB?).
218    *
219    * @param fs  The current file system to use.
220    * @param fileInfo  The store file information.
221    * @param conf  The current configuration.
222    * @param cacheConf  The cache configuration and block cache reference.
223    * @param cfBloomType The bloom type to use for this store file as specified
224    *          by column family configuration. This may or may not be the same
225    *          as the Bloom filter type actually present in the HFile, because
226    *          column family configuration might change. If this is
227    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
228    * @throws IOException When opening the reader fails.
229    */
230   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
231       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
232     this.fs = fs;
233     this.fileInfo = fileInfo;
234     this.cacheConf = cacheConf;
235 
236     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
237       this.cfBloomType = cfBloomType;
238     } else {
239       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
240           "cfBloomType=" + cfBloomType + " (disabled in config)");
241       this.cfBloomType = BloomType.NONE;
242     }
243   }
244 
245   /**
246    * Clone
247    * @param other The StoreFile to clone from
248    */
249   public StoreFile(final StoreFile other) {
250     this.fs = other.fs;
251     this.fileInfo = other.fileInfo;
252     this.cacheConf = other.cacheConf;
253     this.cfBloomType = other.cfBloomType;
254   }
255 
256   /**
257    * @return the StoreFile object associated to this StoreFile.
258    *         null if the StoreFile is not a reference.
259    */
260   public StoreFileInfo getFileInfo() {
261     return this.fileInfo;
262   }
263 
264   /**
265    * @return Path or null if this StoreFile was made with a Stream.
266    */
267   public Path getPath() {
268     return this.fileInfo.getPath();
269   }
270 
271   /**
272    * @return Returns the qualified path of this StoreFile
273    */
274   public Path getQualifiedPath() {
275     return this.fileInfo.getPath().makeQualified(fs);
276   }
277 
278   /**
279    * @return True if this is a StoreFile Reference; call
280    * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
281    */
282   public boolean isReference() {
283     return this.fileInfo.isReference();
284   }
285 
286   /**
287    * @return True if this file was made by a major compaction.
288    */
289   public boolean isMajorCompaction() {
290     if (this.majorCompaction == null) {
291       throw new NullPointerException("This has not been set yet");
292     }
293     return this.majorCompaction.get();
294   }
295 
296   /**
297    * @return True if this file should not be part of a minor compaction.
298    */
299   public boolean excludeFromMinorCompaction() {
300     return this.excludeFromMinorCompaction;
301   }
302 
303   /**
304    * @return This files maximum edit sequence id.
305    */
306   public long getMaxSequenceId() {
307     return this.sequenceid;
308   }
309 
310   public long getModificationTimeStamp() throws IOException {
311     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
312   }
313 
314   /**
315    * Only used by the Striped Compaction Policy
316    * @param key
317    * @return value associated with the metadata key
318    */
319   public byte[] getMetadataValue(byte[] key) {
320     return metadataMap.get(key);
321   }
322 
323   /**
324    * Return the largest memstoreTS found across all storefiles in
325    * the given list. Store files that were created by a mapreduce
326    * bulk load are ignored, as they do not correspond to any specific
327    * put operation, and thus do not have a memstoreTS associated with them.
328    * @return 0 if no non-bulk-load files are provided or, this is Store that
329    * does not yet have any store files.
330    */
331   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
332     long max = 0;
333     for (StoreFile sf : sfs) {
334       if (!sf.isBulkLoadResult()) {
335         max = Math.max(max, sf.getMaxMemstoreTS());
336       }
337     }
338     return max;
339   }
340 
341   /**
342    * Return the highest sequence ID found across all storefiles in
343    * the given list.
344    * @param sfs
345    * @return 0 if no non-bulk-load files are provided or, this is Store that
346    * does not yet have any store files.
347    */
348   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
349     long max = 0;
350     for (StoreFile sf : sfs) {
351       max = Math.max(max, sf.getMaxSequenceId());
352     }
353     return max;
354   }
355 
356   /**
357    * Check if this storefile was created by bulk load.
358    * When a hfile is bulk loaded into HBase, we append
359    * '_SeqId_<id-when-loaded>' to the hfile name, unless
360    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
361    * explicitly turned off.
362    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
363    * is turned off, fall back to BULKLOAD_TIME_KEY.
364    * @return true if this storefile was created by bulk load.
365    */
366   boolean isBulkLoadResult() {
367     boolean bulkLoadedHFile = false;
368     String fileName = this.getPath().getName();
369     int startPos = fileName.indexOf("SeqId_");
370     if (startPos != -1) {
371       bulkLoadedHFile = true;
372     }
373     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
374   }
375 
376   /**
377    * Return the timestamp at which this bulk load file was generated.
378    */
379   public long getBulkLoadTimestamp() {
380     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
381     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
382   }
383 
384   /**
385    * @return the cached value of HDFS blocks distribution. The cached value is
386    * calculated when store file is opened.
387    */
388   public HDFSBlocksDistribution getHDFSBlockDistribution() {
389     return this.fileInfo.getHDFSBlockDistribution();
390   }
391 
392   /**
393    * Opens reader on this store file.  Called by Constructor.
394    * @return Reader for the store file.
395    * @throws IOException
396    * @see #closeReader(boolean)
397    */
398   private Reader open(boolean canUseDropBehind) throws IOException {
399     if (this.reader != null) {
400       throw new IllegalAccessError("Already open");
401     }
402 
403     // Open the StoreFile.Reader
404     this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
405 
406     // Load up indices and fileinfo. This also loads Bloom filter type.
407     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
408 
409     // Read in our metadata.
410     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
411     if (b != null) {
412       // By convention, if halfhfile, top half has a sequence number > bottom
413       // half. Thats why we add one in below. Its done for case the two halves
414       // are ever merged back together --rare.  Without it, on open of store,
415       // since store files are distinguished by sequence id, the one half would
416       // subsume the other.
417       this.sequenceid = Bytes.toLong(b);
418       if (fileInfo.isTopReference()) {
419         this.sequenceid += 1;
420       }
421     }
422 
423     if (isBulkLoadResult()){
424       // generate the sequenceId from the fileName
425       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
426       String fileName = this.getPath().getName();
427       // Use lastIndexOf() to get the last, most recent bulk load seqId.
428       int startPos = fileName.lastIndexOf("SeqId_");
429       if (startPos != -1) {
430         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
431             fileName.indexOf('_', startPos + 6)));
432         // Handle reference files as done above.
433         if (fileInfo.isTopReference()) {
434           this.sequenceid += 1;
435         }
436       }
437       // SKIP_RESET_SEQ_ID only works in bulk loaded file.
438       // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
439       // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
440       // to reset new seqIds for them since this might make a mess of the visibility of cells that
441       // have the same row key but different seqIds.
442       this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
443       this.reader.setBulkLoaded(true);
444     }
445     this.reader.setSequenceID(this.sequenceid);
446 
447     b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
448     if (b != null) {
449       this.maxMemstoreTS = Bytes.toLong(b);
450     }
451 
452     b = metadataMap.get(MAJOR_COMPACTION_KEY);
453     if (b != null) {
454       boolean mc = Bytes.toBoolean(b);
455       if (this.majorCompaction == null) {
456         this.majorCompaction = new AtomicBoolean(mc);
457       } else {
458         this.majorCompaction.set(mc);
459       }
460     } else {
461       // Presume it is not major compacted if it doesn't explicity say so
462       // HFileOutputFormat explicitly sets the major compacted key.
463       this.majorCompaction = new AtomicBoolean(false);
464     }
465 
466     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
467     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
468 
469     BloomType hfileBloomType = reader.getBloomFilterType();
470     if (cfBloomType != BloomType.NONE) {
471       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
472       if (hfileBloomType != cfBloomType) {
473         LOG.info("HFile Bloom filter type for "
474             + reader.getHFileReader().getName() + ": " + hfileBloomType
475             + ", but " + cfBloomType + " specified in column family "
476             + "configuration");
477       }
478     } else if (hfileBloomType != BloomType.NONE) {
479       LOG.info("Bloom filter turned off by CF config for "
480           + reader.getHFileReader().getName());
481     }
482 
483     // load delete family bloom filter
484     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
485 
486     try {
487       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
488       if (timerangeBytes != null) {
489         this.reader.timeRangeTracker = new TimeRangeTracker();
490         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
491       }
492     } catch (IllegalArgumentException e) {
493       LOG.error("Error reading timestamp range data from meta -- " +
494           "proceeding without", e);
495       this.reader.timeRangeTracker = null;
496     }
497     // initialize so we can reuse them after reader closed.
498     firstKey = reader.getFirstKey();
499     lastKey = reader.getLastKey();
500     comparator = reader.getComparator();
501     return this.reader;
502   }
503 
504   public Reader createReader() throws IOException {
505     return createReader(false);
506   }
507 
508   /**
509    * @return Reader for StoreFile. creates if necessary
510    * @throws IOException
511    */
512   public Reader createReader(boolean canUseDropBehind) throws IOException {
513     if (this.reader == null) {
514       try {
515         this.reader = open(canUseDropBehind);
516       } catch (IOException e) {
517         try {
518           this.closeReader(true);
519         } catch (IOException ee) {
520         }
521         throw e;
522       }
523 
524     }
525     return this.reader;
526   }
527 
528   /**
529    * @return Current reader.  Must call createReader first else returns null.
530    * @see #createReader()
531    */
532   public Reader getReader() {
533     return this.reader;
534   }
535 
536   /**
537    * @param evictOnClose whether to evict blocks belonging to this file
538    * @throws IOException
539    */
540   public synchronized void closeReader(boolean evictOnClose)
541       throws IOException {
542     if (this.reader != null) {
543       this.reader.close(evictOnClose);
544       this.reader = null;
545     }
546   }
547 
548   /**
549    * Delete this file
550    * @throws IOException
551    */
552   public void deleteReader() throws IOException {
553     closeReader(true);
554     this.fs.delete(getPath(), true);
555   }
556 
557   @Override
558   public String toString() {
559     return this.fileInfo.toString();
560   }
561 
562   /**
563    * @return a length description of this StoreFile, suitable for debug output
564    */
565   public String toStringDetailed() {
566     StringBuilder sb = new StringBuilder();
567     sb.append(this.getPath().toString());
568     sb.append(", isReference=").append(isReference());
569     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
570     if (isBulkLoadResult()) {
571       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
572     } else {
573       sb.append(", seqid=").append(getMaxSequenceId());
574     }
575     sb.append(", majorCompaction=").append(isMajorCompaction());
576 
577     return sb.toString();
578   }
579 
580   /**
581    * Gets whether to skip resetting the sequence id for cells.
582    * @param skipResetSeqId The byte array of boolean.
583    * @return Whether to skip resetting the sequence id.
584    */
585   private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
586     if (skipResetSeqId != null && skipResetSeqId.length == 1) {
587       return Bytes.toBoolean(skipResetSeqId);
588     }
589     return false;
590   }
591 
592   public static class WriterBuilder {
593     private final Configuration conf;
594     private final CacheConfig cacheConf;
595     private final FileSystem fs;
596 
597     private CellComparator comparator = CellComparator.COMPARATOR;
598     private BloomType bloomType = BloomType.NONE;
599     private long maxKeyCount = 0;
600     private Path dir;
601     private Path filePath;
602     private InetSocketAddress[] favoredNodes;
603     private HFileContext fileContext;
604     private boolean shouldDropCacheBehind = false;
605 
606     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
607         FileSystem fs) {
608       this.conf = conf;
609       this.cacheConf = cacheConf;
610       this.fs = fs;
611     }
612 
613     /**
614      * Use either this method or {@link #withFilePath}, but not both.
615      * @param dir Path to column family directory. The directory is created if
616      *          does not exist. The file is given a unique name within this
617      *          directory.
618      * @return this (for chained invocation)
619      */
620     public WriterBuilder withOutputDir(Path dir) {
621       Preconditions.checkNotNull(dir);
622       this.dir = dir;
623       return this;
624     }
625 
626     /**
627      * Use either this method or {@link #withOutputDir}, but not both.
628      * @param filePath the StoreFile path to write
629      * @return this (for chained invocation)
630      */
631     public WriterBuilder withFilePath(Path filePath) {
632       Preconditions.checkNotNull(filePath);
633       this.filePath = filePath;
634       return this;
635     }
636 
637     /**
638      * @param favoredNodes an array of favored nodes or possibly null
639      * @return this (for chained invocation)
640      */
641     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
642       this.favoredNodes = favoredNodes;
643       return this;
644     }
645 
646     public WriterBuilder withComparator(CellComparator comparator) {
647       Preconditions.checkNotNull(comparator);
648       this.comparator = comparator;
649       return this;
650     }
651 
652     public WriterBuilder withBloomType(BloomType bloomType) {
653       Preconditions.checkNotNull(bloomType);
654       this.bloomType = bloomType;
655       return this;
656     }
657 
658     /**
659      * @param maxKeyCount estimated maximum number of keys we expect to add
660      * @return this (for chained invocation)
661      */
662     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
663       this.maxKeyCount = maxKeyCount;
664       return this;
665     }
666 
667     public WriterBuilder withFileContext(HFileContext fileContext) {
668       this.fileContext = fileContext;
669       return this;
670     }
671 
672     public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
673       this.shouldDropCacheBehind = shouldDropCacheBehind;
674       return this;
675     }
676     /**
677      * Create a store file writer. Client is responsible for closing file when
678      * done. If metadata, add BEFORE closing using
679      * {@link Writer#appendMetadata}.
680      */
681     public Writer build() throws IOException {
682       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
683         throw new IllegalArgumentException("Either specify parent directory " +
684             "or file path");
685       }
686 
687       if (dir == null) {
688         dir = filePath.getParent();
689       }
690 
691       if (!fs.exists(dir)) {
692         fs.mkdirs(dir);
693       }
694 
695       if (filePath == null) {
696         filePath = getUniqueFile(fs, dir);
697         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
698           bloomType = BloomType.NONE;
699         }
700       }
701 
702       if (comparator == null) {
703         comparator = CellComparator.COMPARATOR;
704       }
705       return new Writer(fs, filePath,
706           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
707     }
708   }
709 
710   /**
711    * @param fs
712    * @param dir Directory to create file in.
713    * @return random filename inside passed <code>dir</code>
714    */
715   public static Path getUniqueFile(final FileSystem fs, final Path dir)
716       throws IOException {
717     if (!fs.getFileStatus(dir).isDirectory()) {
718       throw new IOException("Expecting " + dir.toString() +
719         " to be a directory");
720     }
721     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
722   }
723 
724   public Long getMinimumTimestamp() {
725     return (getReader().timeRangeTracker == null) ?
726         null :
727         getReader().timeRangeTracker.getMinimumTimestamp();
728   }
729 
730   /**
731    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
732    * @param comparator Comparator used to compare KVs.
733    * @return The split point row, or null if splitting is not possible, or reader is null.
734    */
735   @SuppressWarnings("deprecation")
736   byte[] getFileSplitPoint(CellComparator comparator) throws IOException {
737     if (this.reader == null) {
738       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
739       return null;
740     }
741     // Get first, last, and mid keys.  Midkey is the key that starts block
742     // in middle of hfile.  Has column and timestamp.  Need to return just
743     // the row we want to split on as midkey.
744     Cell midkey = this.reader.midkey();
745     if (midkey != null) {
746       Cell firstKey = this.reader.getFirstKey();
747       Cell lastKey = this.reader.getLastKey();
748       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
749       if (comparator.compareRows(midkey, firstKey) == 0
750           || comparator.compareRows(midkey, lastKey) == 0) {
751         if (LOG.isDebugEnabled()) {
752           LOG.debug("cannot split because midkey is the same as first or last row");
753         }
754         return null;
755       }
756       return CellUtil.cloneRow(midkey);
757     }
758     return null;
759   }
760 
761   /**
762    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
763    * local because it is an implementation detail of the HBase regionserver.
764    */
765   public static class Writer implements Compactor.CellSink {
766     private final BloomFilterWriter generalBloomFilterWriter;
767     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
768     private final BloomType bloomType;
769     private byte[] lastBloomKey;
770     private int lastBloomKeyOffset, lastBloomKeyLen;
771     private Cell lastCell = null;
772     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
773     private Cell lastDeleteFamilyCell = null;
774     private long deleteFamilyCnt = 0;
775 
776     /** Bytes per Checksum */
777     protected int bytesPerChecksum;
778 
779     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
780     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
781      * When flushing a memstore, we set TimeRange and use this variable to
782      * indicate that it doesn't need to be calculated again while
783      * appending KeyValues.
784      * It is not set in cases of compactions when it is recalculated using only
785      * the appended KeyValues*/
786     boolean isTimeRangeTrackerSet = false;
787 
788     protected HFile.Writer writer;
789     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
790 
791     /**
792      * Creates an HFile.Writer that also write helpful meta data.
793      * @param fs file system to write to
794      * @param path file name to create
795      * @param conf user configuration
796      * @param comparator key comparator
797      * @param bloomType bloom filter setting
798      * @param maxKeys the expected maximum number of keys to be added. Was used
799      *        for Bloom filter size in {@link HFile} format version 1.
800      * @param favoredNodes
801      * @param fileContext - The HFile context
802      * @throws IOException problem writing to FS
803      */
804     private Writer(FileSystem fs, Path path,
805         final Configuration conf,
806         CacheConfig cacheConf,
807         final CellComparator comparator, BloomType bloomType, long maxKeys,
808         InetSocketAddress[] favoredNodes, HFileContext fileContext)
809             throws IOException {
810       writer = HFile.getWriterFactory(conf, cacheConf)
811           .withPath(fs, path)
812           .withComparator(comparator)
813           .withFavoredNodes(favoredNodes)
814           .withFileContext(fileContext)
815           .create();
816 
817       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
818           conf, cacheConf, bloomType,
819           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
820 
821       if (generalBloomFilterWriter != null) {
822         this.bloomType = bloomType;
823         if(this.bloomType ==  BloomType.ROWCOL) {
824           lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue();
825         }
826         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
827           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
828       } else {
829         // Not using Bloom filters.
830         this.bloomType = BloomType.NONE;
831       }
832 
833       // initialize delete family Bloom filter when there is NO RowCol Bloom
834       // filter
835       if (this.bloomType != BloomType.ROWCOL) {
836         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
837             .createDeleteBloomAtWrite(conf, cacheConf,
838                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
839       } else {
840         deleteFamilyBloomFilterWriter = null;
841       }
842       if (deleteFamilyBloomFilterWriter != null) {
843         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
844             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
845       }
846     }
847 
848     /**
849      * Writes meta data.
850      * Call before {@link #close()} since its written as meta data to this file.
851      * @param maxSequenceId Maximum sequence id.
852      * @param majorCompaction True if this file is product of a major compaction
853      * @throws IOException problem writing to FS
854      */
855     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
856     throws IOException {
857       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
858       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
859           Bytes.toBytes(majorCompaction));
860       appendTrackedTimestampsToMetadata();
861     }
862 
863     /**
864      * Writes meta data.
865      * Call before {@link #close()} since its written as meta data to this file.
866      * @param maxSequenceId Maximum sequence id.
867      * @param majorCompaction True if this file is product of a major compaction
868      * @param mobCellsCount The number of mob cells.
869      * @throws IOException problem writing to FS
870      */
871     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
872         final long mobCellsCount) throws IOException {
873       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
874       writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
875       writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
876       appendTrackedTimestampsToMetadata();
877     }
878 
879     /**
880      * Add TimestampRange and earliest put timestamp to Metadata
881      */
882     public void appendTrackedTimestampsToMetadata() throws IOException {
883       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
884       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
885     }
886 
887     /**
888      * Set TimeRangeTracker
889      * @param trt
890      */
891     public void setTimeRangeTracker(final TimeRangeTracker trt) {
892       this.timeRangeTracker = trt;
893       isTimeRangeTrackerSet = true;
894     }
895 
896     /**
897      * Record the earlest Put timestamp.
898      *
899      * If the timeRangeTracker is not set,
900      * update TimeRangeTracker to include the timestamp of this key
901      * @param cell
902      */
903     public void trackTimestamps(final Cell cell) {
904       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
905         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
906       }
907       if (!isTimeRangeTrackerSet) {
908         timeRangeTracker.includeTimestamp(cell);
909       }
910     }
911 
912     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
913       if (this.generalBloomFilterWriter != null) {
914         // only add to the bloom filter on a new, unique key
915         boolean newKey = true;
916         if (this.lastCell != null) {
917           switch(bloomType) {
918           case ROW:
919             newKey = ! CellUtil.matchingRows(cell, lastCell);
920             break;
921           case ROWCOL:
922             newKey = ! CellUtil.matchingRowColumn(cell, lastCell);
923             break;
924           case NONE:
925             newKey = false;
926             break;
927           default:
928             throw new IOException("Invalid Bloom filter type: " + bloomType +
929                 " (ROW or ROWCOL expected)");
930           }
931         }
932         if (newKey) {
933           /*
934            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
935            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
936            *
937            * 2 Types of Filtering:
938            *  1. Row = Row
939            *  2. RowCol = Row + Qualifier
940            */
941           byte[] bloomKey = null;
942           // Used with ROW_COL bloom
943           KeyValue bloomKeyKV = null;
944           int bloomKeyOffset, bloomKeyLen;
945 
946           switch (bloomType) {
947           case ROW:
948             bloomKey = cell.getRowArray();
949             bloomKeyOffset = cell.getRowOffset();
950             bloomKeyLen = cell.getRowLength();
951             break;
952           case ROWCOL:
953             // merge(row, qualifier)
954             // TODO: could save one buffer copy in case of compound Bloom
955             // filters when this involves creating a KeyValue
956             // TODO : Handle while writes also
957             bloomKeyKV = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
958                 cell.getRowLength(), 
959                 HConstants.EMPTY_BYTE_ARRAY, 0, 0, cell.getQualifierArray(),
960                 cell.getQualifierOffset(),
961                 cell.getQualifierLength());
962             bloomKey = bloomKeyKV.getBuffer();
963             bloomKeyOffset = bloomKeyKV.getKeyOffset();
964             bloomKeyLen = bloomKeyKV.getKeyLength();
965             break;
966           default:
967             throw new IOException("Invalid Bloom filter type: " + bloomType +
968                 " (ROW or ROWCOL expected)");
969           }
970           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
971           if (lastBloomKey != null) {
972             int res = 0;
973             // hbase:meta does not have blooms. So we need not have special interpretation
974             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
975             if (bloomType == BloomType.ROW) {
976               res = Bytes.BYTES_RAWCOMPARATOR.compare(bloomKey, bloomKeyOffset, bloomKeyLen,
977                   lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen);
978             } else {
979               // TODO : Caching of kv components becomes important in these cases
980               res = CellComparator.COMPARATOR.compare(bloomKeyKV, lastBloomKeyOnlyKV);
981             }
982             if (res <= 0) {
983               throw new IOException("Non-increasing Bloom keys: "
984                   + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + " after "
985                   + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen));
986             }
987           }
988           lastBloomKey = bloomKey;
989           lastBloomKeyOffset = bloomKeyOffset;
990           lastBloomKeyLen = bloomKeyLen;
991           if (bloomType == BloomType.ROWCOL) {
992             lastBloomKeyOnlyKV.setKey(bloomKey, bloomKeyOffset, bloomKeyLen);
993           }
994           this.lastCell = cell;
995         }
996       }
997     }
998 
999     private void appendDeleteFamilyBloomFilter(final Cell cell)
1000         throws IOException {
1001       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
1002         return;
1003       }
1004 
1005       // increase the number of delete family in the store file
1006       deleteFamilyCnt++;
1007       if (null != this.deleteFamilyBloomFilterWriter) {
1008         boolean newKey = true;
1009         if (lastDeleteFamilyCell != null) {
1010           // hbase:meta does not have blooms. So we need not have special interpretation
1011           // of the hbase:meta cells
1012           newKey = !CellUtil.matchingRows(cell, lastDeleteFamilyCell);
1013         }
1014         if (newKey) {
1015           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
1016               cell.getRowOffset(), cell.getRowLength());
1017           this.lastDeleteFamilyCell = cell;
1018         }
1019       }
1020     }
1021 
1022     public void append(final Cell cell) throws IOException {
1023       appendGeneralBloomfilter(cell);
1024       appendDeleteFamilyBloomFilter(cell);
1025       writer.append(cell);
1026       trackTimestamps(cell);
1027     }
1028 
1029     public Path getPath() {
1030       return this.writer.getPath();
1031     }
1032 
1033     public boolean hasGeneralBloom() {
1034       return this.generalBloomFilterWriter != null;
1035     }
1036 
1037     /**
1038      * For unit testing only.
1039      *
1040      * @return the Bloom filter used by this writer.
1041      */
1042     BloomFilterWriter getGeneralBloomWriter() {
1043       return generalBloomFilterWriter;
1044     }
1045 
1046     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
1047       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
1048       if (haveBloom) {
1049         bfw.compactBloom();
1050       }
1051       return haveBloom;
1052     }
1053 
1054     private boolean closeGeneralBloomFilter() throws IOException {
1055       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1056 
1057       // add the general Bloom filter writer and append file info
1058       if (hasGeneralBloom) {
1059         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1060         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1061             Bytes.toBytes(bloomType.toString()));
1062         if (lastBloomKey != null) {
1063           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1064               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1065                   + lastBloomKeyLen));
1066         }
1067       }
1068       return hasGeneralBloom;
1069     }
1070 
1071     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1072       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1073 
1074       // add the delete family Bloom filter writer
1075       if (hasDeleteFamilyBloom) {
1076         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1077       }
1078 
1079       // append file info about the number of delete family kvs
1080       // even if there is no delete family Bloom.
1081       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1082           Bytes.toBytes(this.deleteFamilyCnt));
1083 
1084       return hasDeleteFamilyBloom;
1085     }
1086 
1087     public void close() throws IOException {
1088       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1089       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1090 
1091       writer.close();
1092 
1093       // Log final Bloom filter statistics. This needs to be done after close()
1094       // because compound Bloom filters might be finalized as part of closing.
1095       if (StoreFile.LOG.isTraceEnabled()) {
1096         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1097           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1098           getPath());
1099       }
1100 
1101     }
1102 
1103     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1104       writer.appendFileInfo(key, value);
1105     }
1106 
1107     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1108      */
1109     HFile.Writer getHFileWriter() {
1110       return writer;
1111     }
1112   }
1113 
1114   /**
1115    * Reader for a StoreFile.
1116    */
1117   public static class Reader {
1118     private static final Log LOG = LogFactory.getLog(Reader.class.getName());
1119 
1120     protected BloomFilter generalBloomFilter = null;
1121     protected BloomFilter deleteFamilyBloomFilter = null;
1122     protected BloomType bloomFilterType;
1123     private final HFile.Reader reader;
1124     protected TimeRangeTracker timeRangeTracker = null;
1125     protected long sequenceID = -1;
1126     private byte[] lastBloomKey;
1127     private long deleteFamilyCnt = -1;
1128     private boolean bulkLoadResult = false;
1129     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
1130     private boolean skipResetSeqId = true;
1131 
1132     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1133         throws IOException {
1134       reader = HFile.createReader(fs, path, cacheConf, conf);
1135       bloomFilterType = BloomType.NONE;
1136     }
1137 
1138     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1139         CacheConfig cacheConf, Configuration conf) throws IOException {
1140       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1141       bloomFilterType = BloomType.NONE;
1142     }
1143 
1144     public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
1145       reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
1146     }
1147     public boolean isPrimaryReplicaReader() {
1148       return reader.isPrimaryReplicaReader();
1149     }
1150 
1151     /**
1152      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1153      */
1154     Reader() {
1155       this.reader = null;
1156     }
1157 
1158     public CellComparator getComparator() {
1159       return reader.getComparator();
1160     }
1161 
1162     /**
1163      * Get a scanner to scan over this StoreFile. Do not use
1164      * this overload if using this scanner for compactions.
1165      *
1166      * @param cacheBlocks should this scanner cache blocks?
1167      * @param pread use pread (for highly concurrent small readers)
1168      * @return a scanner
1169      */
1170     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1171                                                boolean pread) {
1172       return getStoreFileScanner(cacheBlocks, pread, false,
1173         // 0 is passed as readpoint because this method is only used by test
1174         // where StoreFile is directly operated upon
1175         0);
1176     }
1177 
1178     /**
1179      * Get a scanner to scan over this StoreFile.
1180      *
1181      * @param cacheBlocks should this scanner cache blocks?
1182      * @param pread use pread (for highly concurrent small readers)
1183      * @param isCompaction is scanner being used for compaction?
1184      * @return a scanner
1185      */
1186     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1187                                                boolean pread,
1188                                                boolean isCompaction, long readPt) {
1189       return new StoreFileScanner(this,
1190                                  getScanner(cacheBlocks, pread, isCompaction),
1191                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1192     }
1193 
1194     /**
1195      * Warning: Do not write further code which depends on this call. Instead
1196      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1197      * which is the preferred way to scan a store with higher level concepts.
1198      *
1199      * @param cacheBlocks should we cache the blocks?
1200      * @param pread use pread (for concurrent small readers)
1201      * @return the underlying HFileScanner
1202      */
1203     @Deprecated
1204     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1205       return getScanner(cacheBlocks, pread, false);
1206     }
1207 
1208     /**
1209      * Warning: Do not write further code which depends on this call. Instead
1210      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1211      * which is the preferred way to scan a store with higher level concepts.
1212      *
1213      * @param cacheBlocks
1214      *          should we cache the blocks?
1215      * @param pread
1216      *          use pread (for concurrent small readers)
1217      * @param isCompaction
1218      *          is scanner being used for compaction?
1219      * @return the underlying HFileScanner
1220      */
1221     @Deprecated
1222     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1223         boolean isCompaction) {
1224       return reader.getScanner(cacheBlocks, pread, isCompaction);
1225     }
1226 
1227     public void close(boolean evictOnClose) throws IOException {
1228       reader.close(evictOnClose);
1229     }
1230 
1231     /**
1232      * Check if this storeFile may contain keys within the TimeRange that
1233      * have not expired (i.e. not older than oldestUnexpiredTS).
1234      * @param scan the current scan
1235      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1236      *          determined by the column family's TTL
1237      * @return false if queried keys definitely don't exist in this StoreFile
1238      */
1239     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1240       if (timeRangeTracker == null) {
1241         return true;
1242       } else {
1243         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1244             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1245       }
1246     }
1247 
1248     /**
1249      * Checks whether the given scan passes the Bloom filter (if present). Only
1250      * checks Bloom filters for single-row or single-row-column scans. Bloom
1251      * filter checking for multi-gets is implemented as part of the store
1252      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1253      * the lower-level API {@link #passesGeneralRowBloomFilter(byte[], int, int)}
1254      * and {@link #passesGeneralRowColBloomFilter(Cell)}.
1255      *
1256      * @param scan the scan specification. Used to determine the row, and to
1257      *          check whether this is a single-row ("get") scan.
1258      * @param columns the set of columns. Only used for row-column Bloom
1259      *          filters.
1260      * @return true if the scan with the given column set passes the Bloom
1261      *         filter, or if the Bloom filter is not applicable for the scan.
1262      *         False if the Bloom filter is applicable and the scan fails it.
1263      */
1264      boolean passesBloomFilter(Scan scan,
1265         final SortedSet<byte[]> columns) {
1266       // Multi-column non-get scans will use Bloom filters through the
1267       // lower-level API function that this function calls.
1268       if (!scan.isGetScan()) {
1269         return true;
1270       }
1271 
1272       byte[] row = scan.getStartRow();
1273       switch (this.bloomFilterType) {
1274         case ROW:
1275           return passesGeneralRowBloomFilter(row, 0, row.length);
1276 
1277         case ROWCOL:
1278           if (columns != null && columns.size() == 1) {
1279             byte[] column = columns.first();
1280             // create the required fake key
1281             Cell kvKey = KeyValueUtil.createFirstOnRow(row, 0, row.length,
1282               HConstants.EMPTY_BYTE_ARRAY, 0, 0, column, 0,
1283               column.length);
1284             return passesGeneralRowColBloomFilter(kvKey);
1285           }
1286 
1287           // For multi-column queries the Bloom filter is checked from the
1288           // seekExact operation.
1289           return true;
1290 
1291         default:
1292           return true;
1293       }
1294     }
1295 
1296     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1297         int rowLen) {
1298       // Cache Bloom filter as a local variable in case it is set to null by
1299       // another thread on an IO error.
1300       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1301 
1302       // Empty file or there is no delete family at all
1303       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1304         return false;
1305       }
1306 
1307       if (bloomFilter == null) {
1308         return true;
1309       }
1310 
1311       try {
1312         if (!bloomFilter.supportsAutoLoading()) {
1313           return true;
1314         }
1315         return bloomFilter.contains(row, rowOffset, rowLen, null);
1316       } catch (IllegalArgumentException e) {
1317         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1318             e);
1319         setDeleteFamilyBloomFilterFaulty();
1320       }
1321 
1322       return true;
1323     }
1324 
1325     /**
1326      * A method for checking Bloom filters. Called directly from
1327      * StoreFileScanner in case of a multi-column query.
1328      *
1329      * @param row
1330      * @param rowOffset
1331      * @param rowLen
1332      * @return True if passes
1333      */
1334     public boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) {
1335       BloomFilter bloomFilter = this.generalBloomFilter;
1336       if (bloomFilter == null) {
1337         return true;
1338       }
1339 
1340       // Used in ROW bloom
1341       byte[] key = null;
1342       if (rowOffset != 0 || rowLen != row.length) {
1343         throw new AssertionError(
1344             "For row-only Bloom filters the row " + "must occupy the whole array");
1345       }
1346       key = row;
1347       return checkGeneralBloomFilter(key, null, bloomFilter);
1348     }
1349 
1350     /**
1351      * A method for checking Bloom filters. Called directly from
1352      * StoreFileScanner in case of a multi-column query.
1353      *
1354      * @param cell
1355      *          the cell to check if present in BloomFilter
1356      * @return True if passes
1357      */
1358     public boolean passesGeneralRowColBloomFilter(Cell cell) {
1359       BloomFilter bloomFilter = this.generalBloomFilter;
1360       if (bloomFilter == null) {
1361         return true;
1362       }
1363       // Used in ROW_COL bloom
1364       Cell kvKey = null;
1365       // Already if the incoming key is a fake rowcol key then use it as it is
1366       if (cell.getTypeByte() == KeyValue.Type.Maximum.getCode() && cell.getFamilyLength() == 0) {
1367         kvKey = cell;
1368       } else {
1369         kvKey = CellUtil.createFirstOnRowCol(cell);
1370       }
1371       return checkGeneralBloomFilter(null, kvKey, bloomFilter);
1372     }
1373 
1374     private boolean checkGeneralBloomFilter(byte[] key, Cell kvKey, BloomFilter bloomFilter) {
1375       // Empty file
1376       if (reader.getTrailer().getEntryCount() == 0)
1377         return false;
1378       HFileBlock bloomBlock = null;
1379       try {
1380         boolean shouldCheckBloom;
1381         ByteBuff bloom;
1382         if (bloomFilter.supportsAutoLoading()) {
1383           bloom = null;
1384           shouldCheckBloom = true;
1385         } else {
1386           bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true);
1387           bloom = bloomBlock.getBufferWithoutHeader();
1388           shouldCheckBloom = bloom != null;
1389         }
1390 
1391         if (shouldCheckBloom) {
1392           boolean exists;
1393 
1394           // Whether the primary Bloom key is greater than the last Bloom key
1395           // from the file info. For row-column Bloom filters this is not yet
1396           // a sufficient condition to return false.
1397           boolean keyIsAfterLast = (lastBloomKey != null);
1398           // hbase:meta does not have blooms. So we need not have special interpretation
1399           // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1400           if (keyIsAfterLast) {
1401             if (bloomFilterType == BloomType.ROW) {
1402               keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
1403             } else {
1404               keyIsAfterLast = (CellComparator.COMPARATOR.compare(kvKey, lastBloomKeyOnlyKV)) > 0;
1405             }
1406           }
1407 
1408           if (bloomFilterType == BloomType.ROWCOL) {
1409             // Since a Row Delete is essentially a DeleteFamily applied to all
1410             // columns, a file might be skipped if using row+col Bloom filter.
1411             // In order to ensure this file is included an additional check is
1412             // required looking only for a row bloom.
1413             Cell rowBloomKey = CellUtil.createFirstOnRow(kvKey);
1414             // hbase:meta does not have blooms. So we need not have special interpretation
1415             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1416             if (keyIsAfterLast
1417                 && (CellComparator.COMPARATOR.compare(rowBloomKey, lastBloomKeyOnlyKV)) > 0) {
1418               exists = false;
1419             } else {
1420               exists =
1421                   bloomFilter.contains(kvKey, bloom) ||
1422                   bloomFilter.contains(rowBloomKey, bloom);
1423             }
1424           } else {
1425             exists = !keyIsAfterLast
1426                 && bloomFilter.contains(key, 0, key.length, bloom);
1427           }
1428 
1429           return exists;
1430         }
1431       } catch (IOException e) {
1432         LOG.error("Error reading bloom filter data -- proceeding without",
1433             e);
1434         setGeneralBloomFilterFaulty();
1435       } catch (IllegalArgumentException e) {
1436         LOG.error("Bad bloom filter data -- proceeding without", e);
1437         setGeneralBloomFilterFaulty();
1438       } finally {
1439         // Return the bloom block so that its ref count can be decremented.
1440         reader.returnBlock(bloomBlock);
1441       }
1442       return true;
1443     }
1444 
1445     /**
1446      * Checks whether the given scan rowkey range overlaps with the current storefile's
1447      * @param scan the scan specification. Used to determine the rowkey range.
1448      * @return true if there is overlap, false otherwise
1449      */
1450     public boolean passesKeyRangeFilter(Scan scan) {
1451       if (this.getFirstKey() == null || this.getLastKey() == null) {
1452         // the file is empty
1453         return false;
1454       }
1455       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1456           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1457         return true;
1458       }
1459       byte[] smallestScanRow = scan.isReversed() ? scan.getStopRow() : scan.getStartRow();
1460       byte[] largestScanRow = scan.isReversed() ? scan.getStartRow() : scan.getStopRow();
1461       Cell firstKeyKV = this.getFirstKey();
1462       Cell lastKeyKV = this.getLastKey();
1463       boolean nonOverLapping = (getComparator().compareRows(firstKeyKV,
1464           largestScanRow, 0, largestScanRow.length) > 0 
1465           && !Bytes
1466           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1467               HConstants.EMPTY_END_ROW))
1468           || getComparator().compareRows(lastKeyKV, smallestScanRow, 0, smallestScanRow.length) < 0;
1469       return !nonOverLapping;
1470     }
1471 
1472     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1473       Map<byte [], byte []> fi = reader.loadFileInfo();
1474 
1475       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1476       if (b != null) {
1477         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1478       }
1479 
1480       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1481       if(bloomFilterType == BloomType.ROWCOL) {
1482         lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length);
1483       }
1484       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1485       if (cnt != null) {
1486         deleteFamilyCnt = Bytes.toLong(cnt);
1487       }
1488 
1489       return fi;
1490     }
1491 
1492     public void loadBloomfilter() {
1493       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1494       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1495     }
1496 
1497     private void loadBloomfilter(BlockType blockType) {
1498       try {
1499         if (blockType == BlockType.GENERAL_BLOOM_META) {
1500           if (this.generalBloomFilter != null)
1501             return; // Bloom has been loaded
1502 
1503           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1504           if (bloomMeta != null) {
1505             // sanity check for NONE Bloom filter
1506             if (bloomFilterType == BloomType.NONE) {
1507               throw new IOException(
1508                   "valid bloom filter type not found in FileInfo");
1509             } else {
1510               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1511                   reader);
1512               if (LOG.isTraceEnabled()) {
1513                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1514                   + generalBloomFilter.getClass().getSimpleName()
1515                   + " metadata for " + reader.getName());
1516               }
1517             }
1518           }
1519         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1520           if (this.deleteFamilyBloomFilter != null)
1521             return; // Bloom has been loaded
1522 
1523           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1524           if (bloomMeta != null) {
1525             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1526                 bloomMeta, reader);
1527             LOG.info("Loaded Delete Family Bloom ("
1528                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1529                 + ") metadata for " + reader.getName());
1530           }
1531         } else {
1532           throw new RuntimeException("Block Type: " + blockType.toString()
1533               + "is not supported for Bloom filter");
1534         }
1535       } catch (IOException e) {
1536         LOG.error("Error reading bloom filter meta for " + blockType
1537             + " -- proceeding without", e);
1538         setBloomFilterFaulty(blockType);
1539       } catch (IllegalArgumentException e) {
1540         LOG.error("Bad bloom filter meta " + blockType
1541             + " -- proceeding without", e);
1542         setBloomFilterFaulty(blockType);
1543       }
1544     }
1545 
1546     private void setBloomFilterFaulty(BlockType blockType) {
1547       if (blockType == BlockType.GENERAL_BLOOM_META) {
1548         setGeneralBloomFilterFaulty();
1549       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1550         setDeleteFamilyBloomFilterFaulty();
1551       }
1552     }
1553 
1554     /**
1555      * The number of Bloom filter entries in this store file, or an estimate
1556      * thereof, if the Bloom filter is not loaded. This always returns an upper
1557      * bound of the number of Bloom filter entries.
1558      *
1559      * @return an estimate of the number of Bloom filter entries in this file
1560      */
1561     public long getFilterEntries() {
1562       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1563           : reader.getEntries();
1564     }
1565 
1566     public void setGeneralBloomFilterFaulty() {
1567       generalBloomFilter = null;
1568     }
1569 
1570     public void setDeleteFamilyBloomFilterFaulty() {
1571       this.deleteFamilyBloomFilter = null;
1572     }
1573 
1574     public Cell getLastKey() {
1575       return reader.getLastKey();
1576     }
1577 
1578     public byte[] getLastRowKey() {
1579       return reader.getLastRowKey();
1580     }
1581 
1582     public Cell midkey() throws IOException {
1583       return reader.midkey();
1584     }
1585 
1586     public long length() {
1587       return reader.length();
1588     }
1589 
1590     public long getTotalUncompressedBytes() {
1591       return reader.getTrailer().getTotalUncompressedBytes();
1592     }
1593 
1594     public long getEntries() {
1595       return reader.getEntries();
1596     }
1597 
1598     public long getDeleteFamilyCnt() {
1599       return deleteFamilyCnt;
1600     }
1601 
1602     public Cell getFirstKey() {
1603       return reader.getFirstKey();
1604     }
1605 
1606     public long indexSize() {
1607       return reader.indexSize();
1608     }
1609 
1610     public BloomType getBloomFilterType() {
1611       return this.bloomFilterType;
1612     }
1613 
1614     public long getSequenceID() {
1615       return sequenceID;
1616     }
1617 
1618     public void setSequenceID(long sequenceID) {
1619       this.sequenceID = sequenceID;
1620     }
1621 
1622     public void setBulkLoaded(boolean bulkLoadResult) {
1623       this.bulkLoadResult = bulkLoadResult;
1624     }
1625 
1626     public boolean isBulkLoaded() {
1627       return this.bulkLoadResult;
1628     }
1629 
1630     BloomFilter getGeneralBloomFilter() {
1631       return generalBloomFilter;
1632     }
1633 
1634     long getUncompressedDataIndexSize() {
1635       return reader.getTrailer().getUncompressedDataIndexSize();
1636     }
1637 
1638     public long getTotalBloomSize() {
1639       if (generalBloomFilter == null)
1640         return 0;
1641       return generalBloomFilter.getByteSize();
1642     }
1643 
1644     public int getHFileVersion() {
1645       return reader.getTrailer().getMajorVersion();
1646     }
1647 
1648     public int getHFileMinorVersion() {
1649       return reader.getTrailer().getMinorVersion();
1650     }
1651 
1652     public HFile.Reader getHFileReader() {
1653       return reader;
1654     }
1655 
1656     void disableBloomFilterForTesting() {
1657       generalBloomFilter = null;
1658       this.deleteFamilyBloomFilter = null;
1659     }
1660 
1661     public long getMaxTimestamp() {
1662       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1663     }
1664 
1665     boolean isSkipResetSeqId() {
1666       return skipResetSeqId;
1667     }
1668 
1669     void setSkipResetSeqId(boolean skipResetSeqId) {
1670       this.skipResetSeqId = skipResetSeqId;
1671     }
1672   }
1673 
1674   /**
1675    * Useful comparators for comparing StoreFiles.
1676    */
1677   public abstract static class Comparators {
1678     /**
1679      * Comparator that compares based on the Sequence Ids of the
1680      * the StoreFiles. Bulk loads that did not request a seq ID
1681      * are given a seq id of -1; thus, they are placed before all non-
1682      * bulk loads, and bulk loads with sequence Id. Among these files,
1683      * the size is used to determine the ordering, then bulkLoadTime.
1684      * If there are ties, the path name is used as a tie-breaker.
1685      */
1686     public static final Comparator<StoreFile> SEQ_ID =
1687       Ordering.compound(ImmutableList.of(
1688           Ordering.natural().onResultOf(new GetSeqId()),
1689           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1690           Ordering.natural().onResultOf(new GetBulkTime()),
1691           Ordering.natural().onResultOf(new GetPathName())
1692       ));
1693 
1694     private static class GetSeqId implements Function<StoreFile, Long> {
1695       @Override
1696       public Long apply(StoreFile sf) {
1697         return sf.getMaxSequenceId();
1698       }
1699     }
1700 
1701     private static class GetFileSize implements Function<StoreFile, Long> {
1702       @Override
1703       public Long apply(StoreFile sf) {
1704         return sf.getReader().length();
1705       }
1706     }
1707 
1708     private static class GetBulkTime implements Function<StoreFile, Long> {
1709       @Override
1710       public Long apply(StoreFile sf) {
1711         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1712         return sf.getBulkLoadTimestamp();
1713       }
1714     }
1715 
1716     private static class GetPathName implements Function<StoreFile, String> {
1717       @Override
1718       public String apply(StoreFile sf) {
1719         return sf.getPath().getName();
1720       }
1721     }
1722   }
1723 }