1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.NavigableSet;
28 import java.util.Set;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.locks.ReentrantLock;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellComparator;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.DoNotRetryIOException;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.IsolationLevel;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.executor.ExecutorService;
44 import org.apache.hadoop.hbase.filter.Filter;
45 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
46 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
47 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
48 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60 private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61
62 protected final Store store;
63 protected ScanQueryMatcher matcher;
64 protected KeyValueHeap heap;
65 protected boolean cacheBlocks;
66
67 protected int countPerRow = 0;
68 protected int storeLimit = -1;
69 protected int storeOffset = 0;
70
71
72
73 protected boolean closing = false;
74 protected final boolean get;
75 protected final boolean explicitColumnQuery;
76 protected final boolean useRowColBloom;
77
78
79
80 protected boolean parallelSeekEnabled = false;
81 protected ExecutorService executor;
82 protected final Scan scan;
83 protected final NavigableSet<byte[]> columns;
84 protected final long oldestUnexpiredTS;
85 protected final long now;
86 protected final int minVersions;
87 protected final long maxRowSize;
88 protected final long cellsPerHeartbeatCheck;
89
90
91
92 protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
93
94
95
96
97
98 private long kvsScanned = 0;
99 private Cell prevCell = null;
100
101
102 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
103 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
104 "hbase.storescanner.parallel.seek.enable";
105
106
107 protected static boolean lazySeekEnabledGlobally =
108 LAZY_SEEK_ENABLED_BY_DEFAULT;
109
110
111
112
113
114
115 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
116 "hbase.cells.scanned.per.heartbeat.check";
117
118
119
120
121 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
122
123
124 protected Cell lastTop = null;
125
126
127 private boolean scanUsePread = false;
128 protected ReentrantLock lock = new ReentrantLock();
129
130 protected final long readPt;
131
132
133 enum StoreScannerCompactionRace {
134 BEFORE_SEEK,
135 AFTER_SEEK,
136 COMPACT_COMPLETE
137 }
138
139
140 protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
141 final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
142 this.readPt = readPt;
143 this.store = store;
144 this.cacheBlocks = cacheBlocks;
145 get = scan.isGetScan();
146 int numCol = columns == null ? 0 : columns.size();
147 explicitColumnQuery = numCol > 0;
148 this.scan = scan;
149 this.columns = columns;
150 this.now = EnvironmentEdgeManager.currentTime();
151 this.oldestUnexpiredTS = now - scanInfo.getTtl();
152 this.minVersions = scanInfo.getMinVersions();
153
154
155
156
157
158 this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
159
160 this.maxRowSize = scanInfo.getTableMaxRowSize();
161 this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
162 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
163
164 if (this.store != null && this.store.getStorefilesCount() > 1) {
165 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
166 if (rsService != null && scanInfo.isParallelSeekEnabled()) {
167 this.parallelSeekEnabled = true;
168 this.executor = rsService.getExecutorService();
169 }
170 }
171 }
172
173
174
175
176
177
178
179
180
181
182 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
183 long readPt)
184 throws IOException {
185 this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
186 if (columns != null && scan.isRaw()) {
187 throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
188 }
189 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
190 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
191 oldestUnexpiredTS, now, store.getCoprocessorHost());
192
193 this.store.addChangedReaderObserver(this);
194
195
196 List<KeyValueScanner> scanners = getScannersNoCompaction();
197
198
199
200
201
202 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
203 && lazySeekEnabledGlobally, parallelSeekEnabled);
204
205
206 this.storeLimit = scan.getMaxResultsPerColumnFamily();
207
208
209 this.storeOffset = scan.getRowOffsetPerColumnFamily();
210
211
212 resetKVHeap(scanners, store.getComparator());
213 }
214
215
216
217
218
219
220
221
222
223
224
225 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
226 List<? extends KeyValueScanner> scanners, ScanType scanType,
227 long smallestReadPoint, long earliestPutTs) throws IOException {
228 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
229 }
230
231
232
233
234
235
236
237
238
239
240
241
242 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
243 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
244 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
245 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
246 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
247 }
248
249 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
250 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
251 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
252 this(store, scan, scanInfo, null,
253 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
254 if (dropDeletesFromRow == null) {
255 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
256 earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
257 } else {
258 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
259 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
260 }
261
262
263 scanners = selectScannersFrom(scanners);
264
265
266 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
267
268
269 resetKVHeap(scanners, store.getComparator());
270 }
271
272 @VisibleForTesting
273 StoreScanner(final Scan scan, ScanInfo scanInfo,
274 ScanType scanType, final NavigableSet<byte[]> columns,
275 final List<KeyValueScanner> scanners) throws IOException {
276 this(scan, scanInfo, scanType, columns, scanners,
277 HConstants.LATEST_TIMESTAMP,
278
279 0);
280 }
281
282 @VisibleForTesting
283 StoreScanner(final Scan scan, ScanInfo scanInfo,
284 ScanType scanType, final NavigableSet<byte[]> columns,
285 final List<KeyValueScanner> scanners, long earliestPutTs)
286 throws IOException {
287 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
288
289 0);
290 }
291
292 public StoreScanner(final Scan scan, ScanInfo scanInfo,
293 ScanType scanType, final NavigableSet<byte[]> columns,
294 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
295 throws IOException {
296 this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
297 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
298 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
299
300
301 if (this.store != null) {
302 this.store.addChangedReaderObserver(this);
303 }
304
305 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
306 resetKVHeap(scanners, scanInfo.getComparator());
307 }
308
309
310
311
312
313 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
314 final boolean isCompaction = false;
315 boolean usePread = get || scanUsePread;
316 return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
317 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
318 }
319
320
321
322
323
324
325
326
327
328 protected void seekScanners(List<? extends KeyValueScanner> scanners,
329 Cell seekKey, boolean isLazy, boolean isParallelSeek)
330 throws IOException {
331
332
333
334
335 if (isLazy) {
336 for (KeyValueScanner scanner : scanners) {
337 scanner.requestSeek(seekKey, false, true);
338 }
339 } else {
340 if (!isParallelSeek) {
341 long totalScannersSoughtBytes = 0;
342 for (KeyValueScanner scanner : scanners) {
343 if (totalScannersSoughtBytes >= maxRowSize) {
344 throw new RowTooBigException("Max row size allowed: " + maxRowSize
345 + ", but row is bigger than that");
346 }
347 scanner.seek(seekKey);
348 Cell c = scanner.peek();
349 if (c != null) {
350 totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
351 }
352 }
353 } else {
354 parallelSeek(scanners, seekKey);
355 }
356 }
357 }
358
359 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
360 CellComparator comparator) throws IOException {
361
362 heap = new KeyValueHeap(scanners, comparator);
363 }
364
365
366
367
368
369 protected List<KeyValueScanner> selectScannersFrom(
370 final List<? extends KeyValueScanner> allScanners) {
371 boolean memOnly;
372 boolean filesOnly;
373 if (scan instanceof InternalScan) {
374 InternalScan iscan = (InternalScan)scan;
375 memOnly = iscan.isCheckOnlyMemStore();
376 filesOnly = iscan.isCheckOnlyStoreFiles();
377 } else {
378 memOnly = false;
379 filesOnly = false;
380 }
381
382 List<KeyValueScanner> scanners =
383 new ArrayList<KeyValueScanner>(allScanners.size());
384
385
386
387 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
388 Long.MIN_VALUE;
389
390
391 for (KeyValueScanner kvs : allScanners) {
392 boolean isFile = kvs.isFileScanner();
393 if ((!isFile && filesOnly) || (isFile && memOnly)) {
394 continue;
395 }
396
397 if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
398 scanners.add(kvs);
399 }
400 }
401 return scanners;
402 }
403
404 @Override
405 public Cell peek() {
406 lock.lock();
407 try {
408 if (this.heap == null) {
409 return this.lastTop;
410 }
411 return this.heap.peek();
412 } finally {
413 lock.unlock();
414 }
415 }
416
417 @Override
418 public KeyValue next() {
419
420 throw new RuntimeException("Never call StoreScanner.next()");
421 }
422
423 @Override
424 public void close() {
425 close(true);
426 }
427
428 private void close(boolean withHeapClose){
429 lock.lock();
430 try {
431 if (this.closing) {
432 return;
433 }
434 if (withHeapClose) this.closing = true;
435
436 if (this.store != null) this.store.deleteChangedReaderObserver(this);
437 if (withHeapClose) {
438 for (KeyValueHeap h : this.heapsForDelayedClose) {
439 h.close();
440 }
441 this.heapsForDelayedClose.clear();
442 if (this.heap != null) {
443 this.heap.close();
444 this.heap = null;
445 }
446 } else {
447 if (this.heap != null) {
448 this.heapsForDelayedClose.add(this.heap);
449 this.heap = null;
450 }
451 }
452 this.lastTop = null;
453 } finally {
454 lock.unlock();
455 }
456 }
457
458 @Override
459 public boolean seek(Cell key) throws IOException {
460 lock.lock();
461 try {
462
463 checkReseek();
464 return this.heap.seek(key);
465 } finally {
466 lock.unlock();
467 }
468 }
469
470 @Override
471 public boolean next(List<Cell> outResult) throws IOException {
472 return next(outResult, NoLimitScannerContext.getInstance());
473 }
474
475
476
477
478
479
480
481 @Override
482 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
483 lock.lock();
484 try {
485 if (scannerContext == null) {
486 throw new IllegalArgumentException("Scanner context cannot be null");
487 }
488 if (checkReseek()) {
489 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
490 }
491
492
493
494 if (this.heap == null) {
495
496 close(false);
497 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
498 }
499
500 Cell cell = this.heap.peek();
501 if (cell == null) {
502 close(false);
503 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504 }
505
506
507
508
509
510
511
512 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
513 !CellUtil.matchingRow(cell, matcher.curCell)) {
514 this.countPerRow = 0;
515 matcher.setToNewRow(cell);
516 }
517
518
519 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
520
521
522 CellComparator comparator = store != null ? store.getComparator() : null;
523
524 int count = 0;
525 long totalBytesRead = 0;
526
527 LOOP: do {
528
529 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
530 scannerContext.updateTimeProgress();
531 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
532 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
533 }
534 }
535
536 if (prevCell != cell) ++kvsScanned;
537 checkScanOrder(prevCell, cell, comparator);
538 prevCell = cell;
539
540 ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
541 qcode = optimize(qcode, cell);
542 switch(qcode) {
543 case INCLUDE:
544 case INCLUDE_AND_SEEK_NEXT_ROW:
545 case INCLUDE_AND_SEEK_NEXT_COL:
546
547 Filter f = matcher.getFilter();
548 if (f != null) {
549 cell = f.transformCell(cell);
550 }
551
552 this.countPerRow++;
553 if (storeLimit > -1 &&
554 this.countPerRow > (storeLimit + storeOffset)) {
555
556 if (!matcher.moreRowsMayExistAfter(cell)) {
557 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
558 }
559 seekToNextRow(cell);
560 break LOOP;
561 }
562
563
564
565 if (this.countPerRow > storeOffset) {
566 outResult.add(cell);
567
568
569 count++;
570 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
571
572
573 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
574 scannerContext.incrementBatchProgress(1);
575
576 if (totalBytesRead > maxRowSize) {
577 throw new RowTooBigException("Max row size allowed: " + maxRowSize
578 + ", but the row is bigger than that.");
579 }
580 }
581
582 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
583 if (!matcher.moreRowsMayExistAfter(cell)) {
584 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
585 }
586 seekToNextRow(cell);
587 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
588 seekAsDirection(matcher.getKeyForNextColumn(cell));
589 } else {
590 this.heap.next();
591 }
592
593 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
594 break LOOP;
595 }
596 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
597 break LOOP;
598 }
599 continue;
600
601 case DONE:
602 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
603
604 case DONE_SCAN:
605 close(false);
606 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
607
608 case SEEK_NEXT_ROW:
609
610
611 if (!matcher.moreRowsMayExistAfter(cell)) {
612 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
613 }
614
615 seekToNextRow(cell);
616 break;
617
618 case SEEK_NEXT_COL:
619 seekAsDirection(matcher.getKeyForNextColumn(cell));
620 break;
621
622 case SKIP:
623 this.heap.next();
624 break;
625
626 case SEEK_NEXT_USING_HINT:
627 Cell nextKV = matcher.getNextKeyHint(cell);
628 if (nextKV != null) {
629 seekAsDirection(nextKV);
630 } else {
631 heap.next();
632 }
633 break;
634
635 default:
636 throw new RuntimeException("UNEXPECTED");
637 }
638 } while((cell = this.heap.peek()) != null);
639
640 if (count > 0) {
641 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
642 }
643
644
645 close(false);
646 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
647 } finally {
648 lock.unlock();
649 }
650 }
651
652
653
654
655
656 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
657 switch(qcode) {
658 case INCLUDE_AND_SEEK_NEXT_COL:
659 case SEEK_NEXT_COL:
660 {
661 Cell nextIndexedKey = getNextIndexedKey();
662 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
663 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
664 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
665 }
666 break;
667 }
668 case INCLUDE_AND_SEEK_NEXT_ROW:
669 case SEEK_NEXT_ROW:
670 {
671 Cell nextIndexedKey = getNextIndexedKey();
672 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY
673 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
674 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
675 }
676 break;
677 }
678 default:
679 break;
680 }
681 return qcode;
682 }
683
684
685 @Override
686 public void updateReaders() throws IOException {
687 lock.lock();
688 try {
689 if (this.closing) return;
690
691
692
693
694
695
696 if (this.heap == null) return;
697
698
699 this.lastTop = this.peek();
700
701
702
703
704 this.heapsForDelayedClose.add(this.heap);
705 this.heap = null;
706
707
708 } finally {
709 lock.unlock();
710 }
711 }
712
713
714
715
716
717
718 protected boolean checkReseek() throws IOException {
719 if (this.heap == null && this.lastTop != null) {
720 resetScannerStack(this.lastTop);
721 if (this.heap.peek() == null
722 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
723 LOG.debug("Storescanner.peek() is changed where before = "
724 + this.lastTop.toString() + ",and after = " + this.heap.peek());
725 this.lastTop = null;
726 return true;
727 }
728 this.lastTop = null;
729 }
730
731 return false;
732 }
733
734 protected void resetScannerStack(Cell lastTopKey) throws IOException {
735 if (heap != null) {
736 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
737 }
738
739
740
741
742 List<KeyValueScanner> scanners = getScannersNoCompaction();
743
744
745 seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
746
747
748 resetKVHeap(scanners, store.getComparator());
749
750
751
752
753 Cell cell = heap.peek();
754 if (cell == null) {
755 cell = lastTopKey;
756 }
757 if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
758 this.countPerRow = 0;
759 matcher.reset();
760 matcher.setToNewRow(cell);
761 }
762 }
763
764
765
766
767
768
769
770
771 protected void checkScanOrder(Cell prevKV, Cell kv,
772 CellComparator comparator) throws IOException {
773
774 assert prevKV == null || comparator == null
775 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
776 + " followed by a " + "smaller key " + kv + " in cf " + store;
777 }
778
779 protected boolean seekToNextRow(Cell c) throws IOException {
780 return reseek(CellUtil.createLastOnRow(c));
781 }
782
783
784
785
786
787
788
789 protected boolean seekAsDirection(Cell kv)
790 throws IOException {
791 return reseek(kv);
792 }
793
794 @Override
795 public boolean reseek(Cell kv) throws IOException {
796 lock.lock();
797 try {
798
799
800
801 checkReseek();
802 if (explicitColumnQuery && lazySeekEnabledGlobally) {
803 return heap.requestSeek(kv, true, useRowColBloom);
804 }
805 return heap.reseek(kv);
806 } finally {
807 lock.unlock();
808 }
809 }
810
811 @Override
812 public long getSequenceID() {
813 return 0;
814 }
815
816
817
818
819
820
821
822 private void parallelSeek(final List<? extends KeyValueScanner>
823 scanners, final Cell kv) throws IOException {
824 if (scanners.isEmpty()) return;
825 int storeFileScannerCount = scanners.size();
826 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
827 List<ParallelSeekHandler> handlers =
828 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
829 for (KeyValueScanner scanner : scanners) {
830 if (scanner instanceof StoreFileScanner) {
831 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
832 this.readPt, latch);
833 executor.submit(seekHandler);
834 handlers.add(seekHandler);
835 } else {
836 scanner.seek(kv);
837 latch.countDown();
838 }
839 }
840
841 try {
842 latch.await();
843 } catch (InterruptedException ie) {
844 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
845 }
846
847 for (ParallelSeekHandler handler : handlers) {
848 if (handler.getErr() != null) {
849 throw new IOException(handler.getErr());
850 }
851 }
852 }
853
854
855
856
857
858 List<KeyValueScanner> getAllScannersForTesting() {
859 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
860 KeyValueScanner current = heap.getCurrentForTesting();
861 if (current != null)
862 allScanners.add(current);
863 for (KeyValueScanner scanner : heap.getHeap())
864 allScanners.add(scanner);
865 return allScanners;
866 }
867
868 static void enableLazySeekGlobally(boolean enable) {
869 lazySeekEnabledGlobally = enable;
870 }
871
872
873
874
875 public long getEstimatedNumberOfKvsScanned() {
876 return this.kvsScanned;
877 }
878
879 @Override
880 public Cell getNextIndexedKey() {
881 return this.heap.getNextIndexedKey();
882 }
883
884 @Override
885 public void shipped() throws IOException {
886 lock.lock();
887 try {
888 for (KeyValueHeap h : this.heapsForDelayedClose) {
889 h.close();
890 }
891 this.heapsForDelayedClose.clear();
892 if (this.heap != null) {
893 this.heap.shipped();
894 }
895 } finally {
896 lock.unlock();
897 }
898 }
899 }
900