View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.NavigableSet;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.DoNotRetryIOException;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.IsolationLevel;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.executor.ExecutorService;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46  import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47  import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
55   * into List<KeyValue> for a single row.
56   */
57  @InterfaceAudience.Private
58  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60    private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61    // In unit tests, the store could be null
62    protected final Store store;
63    protected ScanQueryMatcher matcher;
64    protected KeyValueHeap heap;
65    protected boolean cacheBlocks;
66  
67    protected int countPerRow = 0;
68    protected int storeLimit = -1;
69    protected int storeOffset = 0;
70  
71    // Used to indicate that the scanner has closed (see HBASE-1107)
72    // Doesnt need to be volatile because it's always accessed via synchronized methods
73    protected boolean closing = false;
74    protected final boolean get;
75    protected final boolean explicitColumnQuery;
76    protected final boolean useRowColBloom;
77    /**
78     * A flag that enables StoreFileScanner parallel-seeking
79     */
80    protected boolean parallelSeekEnabled = false;
81    protected ExecutorService executor;
82    protected final Scan scan;
83    protected final NavigableSet<byte[]> columns;
84    protected final long oldestUnexpiredTS;
85    protected final long now;
86    protected final int minVersions;
87    protected final long maxRowSize;
88    protected final long cellsPerHeartbeatCheck;
89  
90    // Collects all the KVHeap that are eagerly getting closed during the
91    // course of a scan
92    protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93  
94    /**
95     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
96     * KVs skipped via seeking to next row/column. TODO: estimate them?
97     */
98    private long kvsScanned = 0;
99    private Cell prevCell = null;
100 
101   /** We don't ever expect to change this, the constant is just for clarity. */
102   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103   public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104       "hbase.storescanner.parallel.seek.enable";
105 
106   /** Used during unit testing to ensure that lazy seek does save seek ops */
107   protected static boolean lazySeekEnabledGlobally =
108       LAZY_SEEK_ENABLED_BY_DEFAULT;
109 
110   /**
111    * The number of cells scanned in between timeout checks. Specifying a larger value means that
112    * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
113    * timeout checks.
114    */
115   public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116       "hbase.cells.scanned.per.heartbeat.check";
117 
118   /**
119    * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
120    */
121   public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122 
123   // if heap == null and lastTop != null, you need to reseek given the key below
124   protected Cell lastTop = null;
125 
126   // A flag whether use pread for scan
127   private boolean scanUsePread = false;
128   protected ReentrantLock lock = new ReentrantLock();
129   
130   protected final long readPt;
131 
132   // used by the injection framework to test race between StoreScanner construction and compaction
133   enum StoreScannerCompactionRace {
134     BEFORE_SEEK,
135     AFTER_SEEK,
136     COMPACT_COMPLETE
137   }
138 
139   /** An internal constructor. */
140   protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
141       final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
142     this.readPt = readPt;
143     this.store = store;
144     this.cacheBlocks = cacheBlocks;
145     get = scan.isGetScan();
146     int numCol = columns == null ? 0 : columns.size();
147     explicitColumnQuery = numCol > 0;
148     this.scan = scan;
149     this.columns = columns;
150     this.now = EnvironmentEdgeManager.currentTime();
151     this.oldestUnexpiredTS = now - scanInfo.getTtl();
152     this.minVersions = scanInfo.getMinVersions();
153 
154      // We look up row-column Bloom filters for multi-column queries as part of
155      // the seek operation. However, we also look the row-column Bloom filter
156      // for multi-row (non-"get") scans because this is not done in
157      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
158      this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
159 
160      this.maxRowSize = scanInfo.getTableMaxRowSize();
161      this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
162      this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
163      // Parallel seeking is on if the config allows and more there is more than one store file.
164      if (this.store != null && this.store.getStorefilesCount() > 1) {
165        RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
166        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
167          this.parallelSeekEnabled = true;
168          this.executor = rsService.getExecutorService();
169        }
170      }
171   }
172 
173   /**
174    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
175    * are not in a compaction.
176    *
177    * @param store who we scan
178    * @param scan the spec
179    * @param columns which columns we are scanning
180    * @throws IOException
181    */
182   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
183       long readPt)
184   throws IOException {
185     this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
186     if (columns != null && scan.isRaw()) {
187       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
188     }
189     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
190         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
191         oldestUnexpiredTS, now, store.getCoprocessorHost());
192 
193     this.store.addChangedReaderObserver(this);
194 
195     // Pass columns to try to filter out unnecessary StoreFiles.
196     List<KeyValueScanner> scanners = getScannersNoCompaction();
197 
198     // Seek all scanners to the start of the Row (or if the exact matching row
199     // key does not exist, then to the start of the next matching Row).
200     // Always check bloom filter to optimize the top row seek for delete
201     // family marker.
202     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
203         && lazySeekEnabledGlobally, parallelSeekEnabled);
204 
205     // set storeLimit
206     this.storeLimit = scan.getMaxResultsPerColumnFamily();
207 
208     // set rowOffset
209     this.storeOffset = scan.getRowOffsetPerColumnFamily();
210 
211     // Combine all seeked scanners with a heap
212     resetKVHeap(scanners, store.getComparator());
213   }
214 
215   /**
216    * Used for compactions.<p>
217    *
218    * Opens a scanner across specified StoreFiles.
219    * @param store who we scan
220    * @param scan the spec
221    * @param scanners ancillary scanners
222    * @param smallestReadPoint the readPoint that we should use for tracking
223    *          versions
224    */
225   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
226       List<? extends KeyValueScanner> scanners, ScanType scanType,
227       long smallestReadPoint, long earliestPutTs) throws IOException {
228     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
229   }
230 
231   /**
232    * Used for compactions that drop deletes from a limited range of rows.<p>
233    *
234    * Opens a scanner across specified StoreFiles.
235    * @param store who we scan
236    * @param scan the spec
237    * @param scanners ancillary scanners
238    * @param smallestReadPoint the readPoint that we should use for tracking versions
239    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
240    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
241    */
242   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
243       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
244       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
245     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
246         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
247   }
248 
249   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
250       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
251       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
252     this(store, scan, scanInfo, null,
253       ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
254     if (dropDeletesFromRow == null) {
255       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
256           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
257     } else {
258       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
259           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
260     }
261 
262     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
263     scanners = selectScannersFrom(scanners);
264 
265     // Seek all scanners to the initial key
266     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
267 
268     // Combine all seeked scanners with a heap
269     resetKVHeap(scanners, store.getComparator());
270   }
271 
272   @VisibleForTesting
273   StoreScanner(final Scan scan, ScanInfo scanInfo,
274       ScanType scanType, final NavigableSet<byte[]> columns,
275       final List<KeyValueScanner> scanners) throws IOException {
276     this(scan, scanInfo, scanType, columns, scanners,
277         HConstants.LATEST_TIMESTAMP,
278         // 0 is passed as readpoint because the test bypasses Store
279         0);
280   }
281 
282   @VisibleForTesting
283   StoreScanner(final Scan scan, ScanInfo scanInfo,
284     ScanType scanType, final NavigableSet<byte[]> columns,
285     final List<KeyValueScanner> scanners, long earliestPutTs)
286         throws IOException {
287     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
288       // 0 is passed as readpoint because the test bypasses Store
289       0);
290   }
291 
292   public StoreScanner(final Scan scan, ScanInfo scanInfo,
293       ScanType scanType, final NavigableSet<byte[]> columns,
294       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
295   throws IOException {
296     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
297     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
298         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
299 
300     // In unit tests, the store could be null
301     if (this.store != null) {
302       this.store.addChangedReaderObserver(this);
303     }
304     // Seek all scanners to the initial key
305     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
306     resetKVHeap(scanners, scanInfo.getComparator());
307   }
308 
309   /**
310    * Get a filtered list of scanners. Assumes we are not in a compaction.
311    * @return list of scanners to seek
312    */
313   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
314     final boolean isCompaction = false;
315     boolean usePread = get || scanUsePread;
316     return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
317         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
318   }
319 
320   /**
321    * Seek the specified scanners with the given key
322    * @param scanners
323    * @param seekKey
324    * @param isLazy true if using lazy seek
325    * @param isParallelSeek true if using parallel seek
326    * @throws IOException
327    */
328   protected void seekScanners(List<? extends KeyValueScanner> scanners,
329       Cell seekKey, boolean isLazy, boolean isParallelSeek)
330       throws IOException {
331     // Seek all scanners to the start of the Row (or if the exact matching row
332     // key does not exist, then to the start of the next matching Row).
333     // Always check bloom filter to optimize the top row seek for delete
334     // family marker.
335     if (isLazy) {
336       for (KeyValueScanner scanner : scanners) {
337         scanner.requestSeek(seekKey, false, true);
338       }
339     } else {
340       if (!isParallelSeek) {
341         long totalScannersSoughtBytes = 0;
342         for (KeyValueScanner scanner : scanners) {
343           if (totalScannersSoughtBytes >= maxRowSize) {
344             throw new RowTooBigException("Max row size allowed: " + maxRowSize
345               + ", but row is bigger than that");
346           }
347           scanner.seek(seekKey);
348           Cell c = scanner.peek();
349           if (c != null) {
350             totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
351           }
352         }
353       } else {
354         parallelSeek(scanners, seekKey);
355       }
356     }
357   }
358 
359   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
360       CellComparator comparator) throws IOException {
361     // Combine all seeked scanners with a heap
362     heap = new KeyValueHeap(scanners, comparator);
363   }
364 
365   /**
366    * Filters the given list of scanners using Bloom filter, time range, and
367    * TTL.
368    */
369   protected List<KeyValueScanner> selectScannersFrom(
370       final List<? extends KeyValueScanner> allScanners) {
371     boolean memOnly;
372     boolean filesOnly;
373     if (scan instanceof InternalScan) {
374       InternalScan iscan = (InternalScan)scan;
375       memOnly = iscan.isCheckOnlyMemStore();
376       filesOnly = iscan.isCheckOnlyStoreFiles();
377     } else {
378       memOnly = false;
379       filesOnly = false;
380     }
381 
382     List<KeyValueScanner> scanners =
383         new ArrayList<KeyValueScanner>(allScanners.size());
384 
385     // We can only exclude store files based on TTL if minVersions is set to 0.
386     // Otherwise, we might have to return KVs that have technically expired.
387     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
388         Long.MIN_VALUE;
389 
390     // include only those scan files which pass all filters
391     for (KeyValueScanner kvs : allScanners) {
392       boolean isFile = kvs.isFileScanner();
393       if ((!isFile && filesOnly) || (isFile && memOnly)) {
394         continue;
395       }
396 
397       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
398         scanners.add(kvs);
399       }
400     }
401     return scanners;
402   }
403 
404   @Override
405   public Cell peek() {
406     lock.lock();
407     try {
408     if (this.heap == null) {
409       return this.lastTop;
410     }
411     return this.heap.peek();
412     } finally {
413       lock.unlock();
414     }
415   }
416 
417   @Override
418   public KeyValue next() {
419     // throw runtime exception perhaps?
420     throw new RuntimeException("Never call StoreScanner.next()");
421   }
422 
423   @Override
424   public void close() {
425     close(true);
426   }
427 
428   private void close(boolean withHeapClose){
429     lock.lock();
430     try {
431       if (this.closing) {
432         return;
433       }
434       if (withHeapClose) this.closing = true;
435       // Under test, we dont have a this.store
436       if (this.store != null) this.store.deleteChangedReaderObserver(this);
437       if (withHeapClose) {
438         for (KeyValueHeap h : this.heapsForDelayedClose) {
439           h.close();
440         }
441         this.heapsForDelayedClose.clear();
442         if (this.heap != null) {
443           this.heap.close();
444           this.heap = null; // CLOSED!
445         }
446       } else {
447         if (this.heap != null) {
448           this.heapsForDelayedClose.add(this.heap);
449           this.heap = null;
450         }
451       }
452       this.lastTop = null; // If both are null, we are closed.
453     } finally {
454       lock.unlock();
455     }
456   }
457 
458   @Override
459   public boolean seek(Cell key) throws IOException {
460     lock.lock();
461     try {
462     // reset matcher state, in case that underlying store changed
463     checkReseek();
464     return this.heap.seek(key);
465     } finally {
466       lock.unlock();
467     }
468   }
469 
470   @Override
471   public boolean next(List<Cell> outResult) throws IOException {
472     return next(outResult, NoLimitScannerContext.getInstance());
473   }
474 
475   /**
476    * Get the next row of values from this Store.
477    * @param outResult
478    * @param scannerContext
479    * @return true if there are more rows, false if scanner is done
480    */
481   @Override
482   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
483     lock.lock();
484     try {
485       if (scannerContext == null) {
486         throw new IllegalArgumentException("Scanner context cannot be null");
487       }
488       if (checkReseek()) {
489         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490       }
491 
492       // if the heap was left null, then the scanners had previously run out anyways, close and
493       // return.
494       if (this.heap == null) {
495         // By this time partial close should happened because already heap is null
496         close(false);// Do all cleanup except heap.close()
497         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
498       }
499 
500       Cell cell = this.heap.peek();
501       if (cell == null) {
502         close(false);// Do all cleanup except heap.close()
503         return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504       }
505 
506       // only call setRow if the row changes; avoids confusing the query matcher
507       // if scanning intra-row
508 
509       // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
510       // rows. Else it is possible we are still traversing the same row so we must perform the row
511       // comparison.
512       if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
513           !CellUtil.matchingRow(cell, matcher.curCell)) {
514         this.countPerRow = 0;
515         matcher.setToNewRow(cell);
516       }
517 
518       // Clear progress away unless invoker has indicated it should be kept.
519       if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
520 
521       // Only do a sanity-check if store and comparator are available.
522       CellComparator comparator = store != null ? store.getComparator() : null;
523 
524       int count = 0;
525       long totalBytesRead = 0;
526 
527       LOOP: do {
528         // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
529         if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
530           scannerContext.updateTimeProgress();
531           if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
532             return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
533           }
534         }
535 
536         if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
537         checkScanOrder(prevCell, cell, comparator);
538         prevCell = cell;
539 
540         ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
541         qcode = optimize(qcode, cell);
542         switch(qcode) {
543         case INCLUDE:
544         case INCLUDE_AND_SEEK_NEXT_ROW:
545         case INCLUDE_AND_SEEK_NEXT_COL:
546 
547           Filter f = matcher.getFilter();
548           if (f != null) {
549             cell = f.transformCell(cell);
550           }
551 
552           this.countPerRow++;
553           if (storeLimit > -1 &&
554               this.countPerRow > (storeLimit + storeOffset)) {
555             // do what SEEK_NEXT_ROW does.
556             if (!matcher.moreRowsMayExistAfter(cell)) {
557               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
558             }
559             seekToNextRow(cell);
560             break LOOP;
561           }
562 
563           // add to results only if we have skipped #storeOffset kvs
564           // also update metric accordingly
565           if (this.countPerRow > storeOffset) {
566             outResult.add(cell);
567 
568             // Update local tracking information
569             count++;
570             totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
571 
572             // Update the progress of the scanner context
573             scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
574             scannerContext.incrementBatchProgress(1);
575 
576             if (totalBytesRead > maxRowSize) {
577               throw new RowTooBigException("Max row size allowed: " + maxRowSize
578                   + ", but the row is bigger than that.");
579             }
580           }
581 
582           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
583             if (!matcher.moreRowsMayExistAfter(cell)) {
584               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
585             }
586             seekToNextRow(cell);
587           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
588             seekAsDirection(matcher.getKeyForNextColumn(cell));
589           } else {
590             this.heap.next();
591           }
592 
593           if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
594             break LOOP;
595           }
596           if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
597             break LOOP;
598           }
599           continue;
600 
601         case DONE:
602           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
603 
604         case DONE_SCAN:
605           close(false);// Do all cleanup except heap.close()
606           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
607 
608         case SEEK_NEXT_ROW:
609           // This is just a relatively simple end of scan fix, to short-cut end
610           // us if there is an endKey in the scan.
611           if (!matcher.moreRowsMayExistAfter(cell)) {
612             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
613           }
614 
615           seekToNextRow(cell);
616           break;
617 
618         case SEEK_NEXT_COL:
619           seekAsDirection(matcher.getKeyForNextColumn(cell));
620           break;
621 
622         case SKIP:
623           this.heap.next();
624           break;
625 
626         case SEEK_NEXT_USING_HINT:
627           Cell nextKV = matcher.getNextKeyHint(cell);
628           if (nextKV != null) {
629             seekAsDirection(nextKV);
630           } else {
631             heap.next();
632           }
633           break;
634 
635         default:
636           throw new RuntimeException("UNEXPECTED");
637         }
638       } while((cell = this.heap.peek()) != null);
639 
640       if (count > 0) {
641         return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
642       }
643 
644       // No more keys
645       close(false);// Do all cleanup except heap.close()
646       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
647     } finally {
648       lock.unlock();
649     }
650   }
651 
652   /*
653    * See if we should actually SEEK or rather just SKIP to the next Cell.
654    * (see HBASE-13109)
655    */
656   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
657     switch(qcode) {
658     case INCLUDE_AND_SEEK_NEXT_COL:
659     case SEEK_NEXT_COL:
660     {
661       Cell nextIndexedKey = getNextIndexedKey();
662       if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
663           && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
664         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
665       }
666       break;
667     }
668     case INCLUDE_AND_SEEK_NEXT_ROW:
669     case SEEK_NEXT_ROW:
670     {
671       Cell nextIndexedKey = getNextIndexedKey();
672       if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
673           && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
674         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
675       }
676       break;
677     }
678     default:
679       break;
680     }
681     return qcode;
682   }
683 
684   // Implementation of ChangedReadersObserver
685   @Override
686   public void updateReaders() throws IOException {
687     lock.lock();
688     try {
689     if (this.closing) return;
690 
691     // All public synchronized API calls will call 'checkReseek' which will cause
692     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
693     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
694     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
695     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
696     if (this.heap == null) return;
697 
698     // this could be null.
699     this.lastTop = this.peek();
700 
701     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
702 
703     // close scanners to old obsolete Store files
704     this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
705     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
706 
707     // Let the next() call handle re-creating and seeking
708     } finally {
709       lock.unlock();
710     }
711   }
712 
713   /**
714    * @return true if top of heap has changed (and KeyValueHeap has to try the
715    *         next KV)
716    * @throws IOException
717    */
718   protected boolean checkReseek() throws IOException {
719     if (this.heap == null && this.lastTop != null) {
720       resetScannerStack(this.lastTop);
721       if (this.heap.peek() == null
722           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
723         LOG.debug("Storescanner.peek() is changed where before = "
724             + this.lastTop.toString() + ",and after = " + this.heap.peek());
725         this.lastTop = null;
726         return true;
727       }
728       this.lastTop = null; // gone!
729     }
730     // else dont need to reseek
731     return false;
732   }
733 
734   protected void resetScannerStack(Cell lastTopKey) throws IOException {
735     if (heap != null) {
736       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
737     }
738 
739     /* When we have the scan object, should we not pass it to getScanners()
740      * to get a limited set of scanners? We did so in the constructor and we
741      * could have done it now by storing the scan object from the constructor */
742     List<KeyValueScanner> scanners = getScannersNoCompaction();
743 
744     // Seek all scanners to the initial key
745     seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
746 
747     // Combine all seeked scanners with a heap
748     resetKVHeap(scanners, store.getComparator());
749 
750     // Reset the state of the Query Matcher and set to top row.
751     // Only reset and call setRow if the row changes; avoids confusing the
752     // query matcher if scanning intra-row.
753     Cell cell = heap.peek();
754     if (cell == null) {
755       cell = lastTopKey;
756     }
757     if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
758       this.countPerRow = 0;
759       matcher.reset();
760       matcher.setToNewRow(cell);
761     }
762   }
763 
764   /**
765    * Check whether scan as expected order
766    * @param prevKV
767    * @param kv
768    * @param comparator
769    * @throws IOException
770    */
771   protected void checkScanOrder(Cell prevKV, Cell kv,
772       CellComparator comparator) throws IOException {
773     // Check that the heap gives us KVs in an increasing order.
774     assert prevKV == null || comparator == null
775         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
776         + " followed by a " + "smaller key " + kv + " in cf " + store;
777   }
778 
779   protected boolean seekToNextRow(Cell c) throws IOException {
780     return reseek(CellUtil.createLastOnRow(c));
781   }
782 
783   /**
784    * Do a reseek in a normal StoreScanner(scan forward)
785    * @param kv
786    * @return true if scanner has values left, false if end of scanner
787    * @throws IOException
788    */
789   protected boolean seekAsDirection(Cell kv)
790       throws IOException {
791     return reseek(kv);
792   }
793 
794   @Override
795   public boolean reseek(Cell kv) throws IOException {
796     lock.lock();
797     try {
798     //Heap will not be null, if this is called from next() which.
799     //If called from RegionScanner.reseek(...) make sure the scanner
800     //stack is reset if needed.
801     checkReseek();
802     if (explicitColumnQuery && lazySeekEnabledGlobally) {
803       return heap.requestSeek(kv, true, useRowColBloom);
804     }
805     return heap.reseek(kv);
806     } finally {
807       lock.unlock();
808     }
809   }
810 
811   @Override
812   public long getSequenceID() {
813     return 0;
814   }
815 
816   /**
817    * Seek storefiles in parallel to optimize IO latency as much as possible
818    * @param scanners the list {@link KeyValueScanner}s to be read from
819    * @param kv the KeyValue on which the operation is being requested
820    * @throws IOException
821    */
822   private void parallelSeek(final List<? extends KeyValueScanner>
823       scanners, final Cell kv) throws IOException {
824     if (scanners.isEmpty()) return;
825     int storeFileScannerCount = scanners.size();
826     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
827     List<ParallelSeekHandler> handlers = 
828         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
829     for (KeyValueScanner scanner : scanners) {
830       if (scanner instanceof StoreFileScanner) {
831         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
832           this.readPt, latch);
833         executor.submit(seekHandler);
834         handlers.add(seekHandler);
835       } else {
836         scanner.seek(kv);
837         latch.countDown();
838       }
839     }
840 
841     try {
842       latch.await();
843     } catch (InterruptedException ie) {
844       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
845     }
846 
847     for (ParallelSeekHandler handler : handlers) {
848       if (handler.getErr() != null) {
849         throw new IOException(handler.getErr());
850       }
851     }
852   }
853 
854   /**
855    * Used in testing.
856    * @return all scanners in no particular order
857    */
858   List<KeyValueScanner> getAllScannersForTesting() {
859     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
860     KeyValueScanner current = heap.getCurrentForTesting();
861     if (current != null)
862       allScanners.add(current);
863     for (KeyValueScanner scanner : heap.getHeap())
864       allScanners.add(scanner);
865     return allScanners;
866   }
867 
868   static void enableLazySeekGlobally(boolean enable) {
869     lazySeekEnabledGlobally = enable;
870   }
871 
872   /**
873    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
874    */
875   public long getEstimatedNumberOfKvsScanned() {
876     return this.kvsScanned;
877   }
878 
879   @Override
880   public Cell getNextIndexedKey() {
881     return this.heap.getNextIndexedKey();
882   }
883 
884   @Override
885   public void shipped() throws IOException {
886     lock.lock();
887     try {
888       for (KeyValueHeap h : this.heapsForDelayedClose) {
889         h.close();// There wont be further fetch of Cells from these scanners. Just close.
890       }
891       this.heapsForDelayedClose.clear();
892       if (this.heap != null) {
893         this.heap.shipped();
894       }
895     } finally {
896       lock.unlock();
897     }
898   }
899 }
900