1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.Comparator;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.PriorityQueue;
27 import java.util.Set;
28
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
33
34
35
36
37
38
39
40
41
42
43
44
45
46 @InterfaceAudience.Private
47 public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
48 implements KeyValueScanner, InternalScanner {
49 protected PriorityQueue<KeyValueScanner> heap = null;
50
51
52
53 protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
54
55
56
57
58
59
60
61
62
63
64
65 protected KeyValueScanner current = null;
66
67 protected KVScannerComparator comparator;
68
69
70
71
72
73
74
75 public KeyValueHeap(List<? extends KeyValueScanner> scanners,
76 CellComparator comparator) throws IOException {
77 this(scanners, new KVScannerComparator(comparator));
78 }
79
80
81
82
83
84
85
86 KeyValueHeap(List<? extends KeyValueScanner> scanners,
87 KVScannerComparator comparator) throws IOException {
88 this.comparator = comparator;
89 if (!scanners.isEmpty()) {
90 this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
91 this.comparator);
92 for (KeyValueScanner scanner : scanners) {
93 if (scanner.peek() != null) {
94 this.heap.add(scanner);
95 } else {
96 this.scannersForDelayedClose.add(scanner);
97 }
98 }
99 this.current = pollRealKV();
100 }
101 }
102
103 public Cell peek() {
104 if (this.current == null) {
105 return null;
106 }
107 return this.current.peek();
108 }
109
110 public Cell next() throws IOException {
111 if(this.current == null) {
112 return null;
113 }
114 Cell kvReturn = this.current.next();
115 Cell kvNext = this.current.peek();
116 if (kvNext == null) {
117 this.scannersForDelayedClose.add(this.current);
118 this.current = null;
119 this.current = pollRealKV();
120 } else {
121 KeyValueScanner topScanner = this.heap.peek();
122
123 if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
124 this.heap.add(this.current);
125 this.current = null;
126 this.current = pollRealKV();
127 }
128 }
129 return kvReturn;
130 }
131
132
133
134
135
136
137
138
139
140
141
142 @Override
143 public boolean next(List<Cell> result) throws IOException {
144 return next(result, NoLimitScannerContext.getInstance());
145 }
146
147 @Override
148 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
149 if (this.current == null) {
150 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
151 }
152 InternalScanner currentAsInternal = (InternalScanner)this.current;
153 boolean moreCells = currentAsInternal.next(result, scannerContext);
154 Cell pee = this.current.peek();
155
156
157
158
159
160
161
162
163
164 if (pee == null || !moreCells) {
165
166 this.scannersForDelayedClose.add(this.current);
167 } else {
168 this.heap.add(this.current);
169 }
170 this.current = null;
171 this.current = pollRealKV();
172 if (this.current == null) {
173 moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
174 }
175 return moreCells;
176 }
177
178 protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
179 protected CellComparator kvComparator;
180
181
182
183
184 public KVScannerComparator(CellComparator kvComparator) {
185 this.kvComparator = kvComparator;
186 }
187 public int compare(KeyValueScanner left, KeyValueScanner right) {
188 int comparison = compare(left.peek(), right.peek());
189 if (comparison != 0) {
190 return comparison;
191 } else {
192
193
194 long leftSequenceID = left.getSequenceID();
195 long rightSequenceID = right.getSequenceID();
196 if (leftSequenceID > rightSequenceID) {
197 return -1;
198 } else if (leftSequenceID < rightSequenceID) {
199 return 1;
200 } else {
201 return 0;
202 }
203 }
204 }
205
206
207
208
209
210
211 public int compare(Cell left, Cell right) {
212 return this.kvComparator.compare(left, right);
213 }
214
215
216
217 public CellComparator getComparator() {
218 return this.kvComparator;
219 }
220 }
221
222 public void close() {
223 for (KeyValueScanner scanner : this.scannersForDelayedClose) {
224 scanner.close();
225 }
226 this.scannersForDelayedClose.clear();
227 if (this.current != null) {
228 this.current.close();
229 }
230 if (this.heap != null) {
231 KeyValueScanner scanner;
232 while ((scanner = this.heap.poll()) != null) {
233 scanner.close();
234 }
235 }
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253 @Override
254 public boolean seek(Cell seekKey) throws IOException {
255 return generalizedSeek(false,
256 seekKey,
257 false,
258 false);
259 }
260
261
262
263
264
265 @Override
266 public boolean reseek(Cell seekKey) throws IOException {
267 return generalizedSeek(false,
268 seekKey,
269 true,
270 false);
271 }
272
273
274
275
276 @Override
277 public boolean requestSeek(Cell key, boolean forward,
278 boolean useBloom) throws IOException {
279 return generalizedSeek(true, key, forward, useBloom);
280 }
281
282
283
284
285
286
287
288
289
290 private boolean generalizedSeek(boolean isLazy, Cell seekKey,
291 boolean forward, boolean useBloom) throws IOException {
292 if (!isLazy && useBloom) {
293 throw new IllegalArgumentException("Multi-column Bloom filter " +
294 "optimization requires a lazy seek");
295 }
296
297 if (current == null) {
298 return false;
299 }
300 heap.add(current);
301 current = null;
302
303 KeyValueScanner scanner;
304 while ((scanner = heap.poll()) != null) {
305 Cell topKey = scanner.peek();
306 if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
307
308
309
310
311
312
313 heap.add(scanner);
314 current = pollRealKV();
315 return current != null;
316 }
317
318 boolean seekResult;
319 if (isLazy && heap.size() > 0) {
320
321 seekResult = scanner.requestSeek(seekKey, forward, useBloom);
322 } else {
323 seekResult = NonLazyKeyValueScanner.doRealSeek(
324 scanner, seekKey, forward);
325 }
326
327 if (!seekResult) {
328 this.scannersForDelayedClose.add(scanner);
329 } else {
330 heap.add(scanner);
331 }
332 }
333
334
335 return false;
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349 protected KeyValueScanner pollRealKV() throws IOException {
350 KeyValueScanner kvScanner = heap.poll();
351 if (kvScanner == null) {
352 return null;
353 }
354
355 while (kvScanner != null && !kvScanner.realSeekDone()) {
356 if (kvScanner.peek() != null) {
357 try {
358 kvScanner.enforceSeek();
359 } catch (IOException ioe) {
360
361 this.scannersForDelayedClose.add(kvScanner);
362 throw ioe;
363 }
364 Cell curKV = kvScanner.peek();
365 if (curKV != null) {
366 KeyValueScanner nextEarliestScanner = heap.peek();
367 if (nextEarliestScanner == null) {
368
369 return kvScanner;
370 }
371
372
373
374 Cell nextKV = nextEarliestScanner.peek();
375 if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
376
377 return kvScanner;
378 }
379
380
381
382
383 heap.add(kvScanner);
384 } else {
385
386
387 this.scannersForDelayedClose.add(kvScanner);
388 }
389 } else {
390
391
392 this.scannersForDelayedClose.add(kvScanner);
393 }
394 kvScanner = heap.poll();
395 }
396
397 return kvScanner;
398 }
399
400
401
402
403 public PriorityQueue<KeyValueScanner> getHeap() {
404 return this.heap;
405 }
406
407 @Override
408 public long getSequenceID() {
409 return 0;
410 }
411
412 KeyValueScanner getCurrentForTesting() {
413 return current;
414 }
415
416 @Override
417 public Cell getNextIndexedKey() {
418
419 return current == null ? null : current.getNextIndexedKey();
420 }
421
422 @Override
423 public void shipped() throws IOException {
424 for (KeyValueScanner scanner : this.scannersForDelayedClose) {
425 scanner.close();
426 }
427 this.scannersForDelayedClose.clear();
428 if (this.current != null) {
429 this.current.shipped();
430 }
431 if (this.heap != null) {
432 for (KeyValueScanner scanner : this.heap) {
433 scanner.shipped();
434 }
435 }
436 }
437 }