1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.file.rfile;
18
19 import java.io.DataInput;
20 import java.io.DataInputStream;
21 import java.io.DataOutput;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.TreeMap;
37 import java.util.concurrent.atomic.AtomicBoolean;
38
39 import org.apache.accumulo.core.conf.AccumuloConfiguration;
40 import org.apache.accumulo.core.conf.Property;
41 import org.apache.accumulo.core.data.ArrayByteSequence;
42 import org.apache.accumulo.core.data.ByteSequence;
43 import org.apache.accumulo.core.data.Key;
44 import org.apache.accumulo.core.data.Range;
45 import org.apache.accumulo.core.data.Value;
46 import org.apache.accumulo.core.file.FileSKVIterator;
47 import org.apache.accumulo.core.file.FileSKVWriter;
48 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
49 import org.apache.accumulo.core.file.blockfile.ABlockReader;
50 import org.apache.accumulo.core.file.blockfile.ABlockWriter;
51 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
52 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
53 import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
54 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
55 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
56 import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
57 import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
58 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
59 import org.apache.accumulo.core.iterators.IterationInterruptedException;
60 import org.apache.accumulo.core.iterators.IteratorEnvironment;
61 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
62 import org.apache.accumulo.core.iterators.system.HeapIterator;
63 import org.apache.hadoop.io.Writable;
64 import org.apache.log4j.Logger;
65
66 public class RFile {
67
68 public static final String EXTENSION = "rf";
69
70 private static final Logger log = Logger.getLogger(RFile.class);
71
72 private RFile() {}
73
74 private static final int RINDEX_MAGIC = 0x20637474;
75 static final int RINDEX_VER_7 = 7;
76 static final int RINDEX_VER_6 = 6;
77
78 static final int RINDEX_VER_4 = 4;
79 static final int RINDEX_VER_3 = 3;
80
81 private static class Count {
82 public Count(int i) {
83 this.count = i;
84 }
85
86 public Count(long count) {
87 this.count = count;
88 }
89
90 long count;
91 }
92
93 private static class LocalityGroupMetadata implements Writable {
94
95 private int startBlock;
96 private Key firstKey;
97 private Map<ByteSequence,Count> columnFamilies;
98
99 private boolean isDefaultLG = false;
100 private String name;
101 private Set<ByteSequence> previousColumnFamilies;
102
103 private MultiLevelIndex.BufferedWriter indexWriter;
104 private MultiLevelIndex.Reader indexReader;
105
106 public LocalityGroupMetadata(int version, BlockFileReader br) {
107 columnFamilies = new HashMap<ByteSequence,Count>();
108 indexReader = new MultiLevelIndex.Reader(br, version);
109 }
110
111 public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
112 this.startBlock = nextBlock;
113 isDefaultLG = true;
114 columnFamilies = new HashMap<ByteSequence,Count>();
115 previousColumnFamilies = pcf;
116
117 indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
118 }
119
120 public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
121 this.startBlock = nextBlock;
122 this.name = name;
123 isDefaultLG = false;
124 columnFamilies = new HashMap<ByteSequence,Count>();
125 for (ByteSequence cf : cfset) {
126 columnFamilies.put(cf, new Count(0));
127 }
128
129 indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
130 }
131
132 private Key getFirstKey() {
133 return firstKey;
134 }
135
136 private void setFirstKey(Key key) {
137 if (firstKey != null)
138 throw new IllegalStateException();
139 this.firstKey = new Key(key);
140 }
141
142 public void updateColumnCount(Key key) {
143
144 if (isDefaultLG && columnFamilies == null) {
145 if (previousColumnFamilies.size() > 0) {
146
147 ByteSequence cf = key.getColumnFamilyData();
148 if (previousColumnFamilies.contains(cf)) {
149 throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
150 }
151 }
152
153
154 return;
155 }
156
157 ByteSequence cf = key.getColumnFamilyData();
158 Count count = columnFamilies.get(cf);
159
160 if (count == null) {
161 if (!isDefaultLG) {
162 throw new IllegalArgumentException("invalid column family : " + cf);
163 }
164
165 if (previousColumnFamilies.contains(cf)) {
166 throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
167 }
168
169 if (columnFamilies.size() > Writer.MAX_CF_IN_DLG) {
170
171 columnFamilies = null;
172 return;
173 }
174 count = new Count(0);
175 columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
176
177 }
178
179 count.count++;
180
181 }
182
183 @Override
184 public void readFields(DataInput in) throws IOException {
185
186 isDefaultLG = in.readBoolean();
187 if (!isDefaultLG) {
188 name = in.readUTF();
189 }
190
191 startBlock = in.readInt();
192
193 int size = in.readInt();
194
195 if (size == -1) {
196 if (!isDefaultLG)
197 throw new IllegalStateException("Non default LG " + name + " does not have column families");
198
199 columnFamilies = null;
200 } else {
201 if (columnFamilies == null)
202 columnFamilies = new HashMap<ByteSequence,Count>();
203 else
204 columnFamilies.clear();
205
206 for (int i = 0; i < size; i++) {
207 int len = in.readInt();
208 byte cf[] = new byte[len];
209 in.readFully(cf);
210 long count = in.readLong();
211
212 columnFamilies.put(new ArrayByteSequence(cf), new Count(count));
213 }
214 }
215
216 if (in.readBoolean()) {
217 firstKey = new Key();
218 firstKey.readFields(in);
219 } else {
220 firstKey = null;
221 }
222
223 indexReader.readFields(in);
224 }
225
226 @Override
227 public void write(DataOutput out) throws IOException {
228
229 out.writeBoolean(isDefaultLG);
230 if (!isDefaultLG) {
231 out.writeUTF(name);
232 }
233
234 out.writeInt(startBlock);
235
236 if (isDefaultLG && columnFamilies == null) {
237
238 out.writeInt(-1);
239 } else {
240 out.writeInt(columnFamilies.size());
241
242 for (Entry<ByteSequence,Count> entry : columnFamilies.entrySet()) {
243 out.writeInt(entry.getKey().length());
244 out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
245 out.writeLong(entry.getValue().count);
246 }
247 }
248
249 out.writeBoolean(firstKey != null);
250 if (firstKey != null)
251 firstKey.write(out);
252
253 indexWriter.close(out);
254 }
255
256 public void printInfo() throws IOException {
257 PrintStream out = System.out;
258 out.println("Locality group : " + (isDefaultLG ? "<DEFAULT>" : name));
259 out.println("\tStart block : " + startBlock);
260 out.println("\tNum blocks : " + String.format("%,d", indexReader.size()));
261 TreeMap<Integer,Long> sizesByLevel = new TreeMap<Integer,Long>();
262 TreeMap<Integer,Long> countsByLevel = new TreeMap<Integer,Long>();
263 indexReader.getIndexInfo(sizesByLevel, countsByLevel);
264 for (Entry<Integer,Long> entry : sizesByLevel.descendingMap().entrySet()) {
265 out.println("\tIndex level " + entry.getKey() + " : "
266 + String.format("%,d bytes %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
267 }
268 out.println("\tFirst key : " + firstKey);
269
270 Key lastKey = null;
271 if (indexReader != null && indexReader.size() > 0) {
272 lastKey = indexReader.getLastKey();
273 }
274
275 out.println("\tLast key : " + lastKey);
276
277 long numKeys = 0;
278 IndexIterator countIter = indexReader.lookup(new Key());
279 while (countIter.hasNext()) {
280 numKeys += countIter.next().getNumEntries();
281 }
282
283 out.println("\tNum entries : " + String.format("%,d", numKeys));
284 out.println("\tColumn families : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
285 }
286
287 }
288
289 public static class Writer implements FileSKVWriter {
290
291 public static final int MAX_CF_IN_DLG = 1000;
292
293 private BlockFileWriter fileWriter;
294 private ABlockWriter blockWriter;
295
296
297 private long blockSize = 100000;
298 private int indexBlockSize;
299 private int entries = 0;
300
301 private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
302 private LocalityGroupMetadata currentLocalityGroup = null;
303 private int nextBlock = 0;
304
305 private Key lastKeyInBlock = null;
306
307 private boolean dataClosed = false;
308 private boolean closed = false;
309 private Key prevKey = new Key();
310 private boolean startedDefaultLocalityGroup = false;
311
312 private HashSet<ByteSequence> previousColumnFamilies;
313
314 public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
315 this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
316 }
317
318 public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
319 this.blockSize = blockSize;
320 this.indexBlockSize = indexBlockSize;
321 this.fileWriter = bfw;
322 this.blockWriter = null;
323 previousColumnFamilies = new HashSet<ByteSequence>();
324 }
325
326 @Override
327 public synchronized void close() throws IOException {
328
329 if (closed) {
330 return;
331 }
332
333 closeData();
334
335 ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
336
337 mba.writeInt(RINDEX_MAGIC);
338 mba.writeInt(RINDEX_VER_7);
339
340 if (currentLocalityGroup != null)
341 localityGroups.add(currentLocalityGroup);
342
343 mba.writeInt(localityGroups.size());
344
345 for (LocalityGroupMetadata lc : localityGroups) {
346 lc.write(mba);
347 }
348
349 mba.close();
350
351 fileWriter.close();
352
353 closed = true;
354 }
355
356 private void closeData() throws IOException {
357
358 if (dataClosed) {
359 return;
360 }
361
362 dataClosed = true;
363
364 if (blockWriter != null) {
365 closeBlock(lastKeyInBlock, true);
366 }
367 }
368
369 @Override
370 public void append(Key key, Value value) throws IOException {
371
372 if (dataClosed) {
373 throw new IllegalStateException("Cannont append, data closed");
374 }
375
376 if (key.compareTo(prevKey) < 0) {
377 throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
378 }
379
380 currentLocalityGroup.updateColumnCount(key);
381
382 if (currentLocalityGroup.getFirstKey() == null) {
383 currentLocalityGroup.setFirstKey(key);
384 }
385
386 if (blockWriter == null) {
387 blockWriter = fileWriter.prepareDataBlock();
388 } else if (blockWriter.getRawSize() > blockSize) {
389 closeBlock(prevKey, false);
390 blockWriter = fileWriter.prepareDataBlock();
391 }
392
393 RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
394
395 rk.write(blockWriter);
396 value.write(blockWriter);
397 entries++;
398
399 prevKey = new Key(key);
400 lastKeyInBlock = prevKey;
401
402 }
403
404 private void closeBlock(Key key, boolean lastBlock) throws IOException {
405 blockWriter.close();
406
407 if (lastBlock)
408 currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
409 else
410 currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
411
412 blockWriter = null;
413 lastKeyInBlock = null;
414 entries = 0;
415 nextBlock++;
416 }
417
418 @Override
419 public DataOutputStream createMetaStore(String name) throws IOException {
420 closeData();
421
422 return (DataOutputStream) fileWriter.prepareMetaBlock(name);
423 }
424
425 private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
426 if (dataClosed) {
427 throw new IllegalStateException("data closed");
428 }
429
430 if (startedDefaultLocalityGroup) {
431 throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
432 }
433
434 if (blockWriter != null) {
435 closeBlock(lastKeyInBlock, true);
436 }
437
438 if (currentLocalityGroup != null) {
439 localityGroups.add(currentLocalityGroup);
440 }
441
442 if (columnFamilies == null) {
443 startedDefaultLocalityGroup = true;
444 currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
445 } else {
446 if (!Collections.disjoint(columnFamilies, previousColumnFamilies)) {
447 HashSet<ByteSequence> overlap = new HashSet<ByteSequence>(columnFamilies);
448 overlap.retainAll(previousColumnFamilies);
449 throw new IllegalArgumentException("Column families over lap with previous locality group : " + overlap);
450 }
451 currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
452 previousColumnFamilies.addAll(columnFamilies);
453 }
454
455 prevKey = new Key();
456 }
457
458 @Override
459 public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
460 if (columnFamilies == null)
461 throw new NullPointerException();
462
463 _startNewLocalityGroup(name, columnFamilies);
464 }
465
466 @Override
467 public void startDefaultLocalityGroup() throws IOException {
468 _startNewLocalityGroup(null, null);
469 }
470
471 @Override
472 public boolean supportsLocalityGroups() {
473 return true;
474 }
475 }
476
477 private static class LocalityGroupReader implements FileSKVIterator {
478
479 private BlockFileReader reader;
480 private MultiLevelIndex.Reader index;
481 private int blockCount;
482 private Key firstKey;
483 private int startBlock;
484 private Map<ByteSequence,Count> columnFamilies;
485 private boolean isDefaultLocalityGroup;
486 private boolean closed = false;
487 private int version;
488 private boolean checkRange = true;
489
490 private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
491 this.firstKey = lgm.firstKey;
492 this.index = lgm.indexReader;
493 this.startBlock = lgm.startBlock;
494 blockCount = index.size();
495 this.columnFamilies = lgm.columnFamilies;
496 this.isDefaultLocalityGroup = lgm.isDefaultLG;
497 this.version = version;
498
499 this.reader = reader;
500
501 }
502
503 public LocalityGroupReader(LocalityGroupReader lgr) {
504 this.firstKey = lgr.firstKey;
505 this.index = lgr.index;
506 this.startBlock = lgr.startBlock;
507 this.blockCount = lgr.blockCount;
508 this.columnFamilies = lgr.columnFamilies;
509 this.isDefaultLocalityGroup = lgr.isDefaultLocalityGroup;
510 this.reader = lgr.reader;
511 this.version = lgr.version;
512 }
513
514 Iterator<IndexEntry> getIndex() throws IOException {
515 return index.lookup(new Key());
516 }
517
518 @Override
519 public void close() throws IOException {
520 closed = true;
521 hasTop = false;
522 if (currBlock != null)
523 currBlock.close();
524
525 }
526
527 private IndexIterator iiter;
528 private int entriesLeft;
529 private ABlockReader currBlock;
530 private RelativeKey rk;
531 private Value val;
532 private Key prevKey = null;
533 private Range range = null;
534 private boolean hasTop = false;
535 private AtomicBoolean interruptFlag;
536
537 @Override
538 public Key getTopKey() {
539 return rk.getKey();
540 }
541
542 @Override
543 public Value getTopValue() {
544 return val;
545 }
546
547 @Override
548 public boolean hasTop() {
549 return hasTop;
550 }
551
552 @Override
553 public void next() throws IOException {
554 try {
555 _next();
556 } catch (IOException ioe) {
557 reset();
558 throw ioe;
559 }
560 }
561
562 private void _next() throws IOException {
563
564 if (!hasTop)
565 throw new IllegalStateException();
566
567 if (entriesLeft == 0) {
568 currBlock.close();
569
570 if (iiter.hasNext()) {
571 IndexEntry indexEntry = iiter.next();
572 entriesLeft = indexEntry.getNumEntries();
573 currBlock = getDataBlock(indexEntry);
574
575 checkRange = range.afterEndKey(indexEntry.getKey());
576 if (!checkRange)
577 hasTop = true;
578
579 } else {
580 rk = null;
581 val = null;
582 hasTop = false;
583 return;
584 }
585 }
586
587 prevKey = rk.getKey();
588 rk.readFields(currBlock);
589 val.readFields(currBlock);
590 entriesLeft--;
591 if (checkRange)
592 hasTop = !range.afterEndKey(rk.getKey());
593 }
594
595 private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException {
596 if (interruptFlag != null && interruptFlag.get())
597 throw new IterationInterruptedException();
598
599 if (version == RINDEX_VER_3 || version == RINDEX_VER_4)
600 return reader.getDataBlock(startBlock + iiter.previousIndex());
601 else
602 return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(), indexEntry.getRawSize());
603
604 }
605
606 @Override
607 public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
608
609 if (closed)
610 throw new IllegalStateException("Locality group reader closed");
611
612 if (columnFamilies.size() != 0 || inclusive)
613 throw new IllegalArgumentException("I do not know how to filter column families");
614
615 if (interruptFlag != null && interruptFlag.get())
616 throw new IterationInterruptedException();
617
618 try {
619 _seek(range);
620 } catch (IOException ioe) {
621 reset();
622 throw ioe;
623 }
624 }
625
626 private void reset() {
627 rk = null;
628 hasTop = false;
629 if (currBlock != null) {
630 try {
631 try {
632 currBlock.close();
633 } catch (IOException e) {
634 log.warn("Failed to close block reader", e);
635 }
636 } finally {
637 currBlock = null;
638 }
639 }
640 }
641
642 private void _seek(Range range) throws IOException {
643
644 this.range = range;
645 this.checkRange = true;
646
647 if (blockCount == 0) {
648
649 rk = null;
650 return;
651 }
652
653 Key startKey = range.getStartKey();
654 if (startKey == null)
655 startKey = new Key();
656
657 boolean reseek = true;
658
659 if (range.afterEndKey(firstKey)) {
660
661 reset();
662 reseek = false;
663 }
664
665 if (rk != null) {
666 if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
667
668
669 reseek = false;
670 }
671
672 if (startKey.compareTo(getTopKey()) <= 0 && startKey.compareTo(prevKey) > 0) {
673
674 reseek = false;
675 }
676
677 if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
678
679
680
681
682
683
684
685
686 MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
687 SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
688 if (skippr.skipped > 0) {
689 entriesLeft -= skippr.skipped;
690 val = new Value(valbs.toArray());
691 prevKey = skippr.prevKey;
692 rk = skippr.rk;
693 }
694
695 reseek = false;
696 }
697
698 if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) {
699
700
701 reseek = false;
702 }
703 }
704
705 if (reseek) {
706 iiter = index.lookup(startKey);
707
708 reset();
709
710 if (!iiter.hasNext()) {
711
712 } else {
713
714
715
716 while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
717 iiter.previous();
718 }
719
720 if (iiter.hasPrevious())
721 prevKey = new Key(iiter.peekPrevious().getKey());
722 else
723 prevKey = new Key();
724
725 IndexEntry indexEntry = iiter.next();
726 entriesLeft = indexEntry.getNumEntries();
727 currBlock = getDataBlock(indexEntry);
728
729 checkRange = range.afterEndKey(indexEntry.getKey());
730 if (!checkRange)
731 hasTop = true;
732
733 MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
734
735 Key currKey = null;
736
737 if (currBlock.isIndexable()) {
738 BlockIndex blockIndex = BlockIndex.getIndex(currBlock, indexEntry);
739 if (blockIndex != null) {
740 BlockIndexEntry bie = blockIndex.seekBlock(startKey, currBlock);
741 if (bie != null) {
742
743
744 RelativeKey tmpRk = new RelativeKey();
745 tmpRk.setPrevKey(bie.getPrevKey());
746 tmpRk.readFields(currBlock);
747 val = new Value();
748
749 val.readFields(currBlock);
750 valbs = new MByteSequence(val.get(), 0, val.getSize());
751
752
753 entriesLeft = bie.getEntriesLeft() - 1;
754 prevKey = new Key(bie.getPrevKey());
755 currKey = tmpRk.getKey();
756 }
757 }
758 }
759
760 SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
761 prevKey = skippr.prevKey;
762 entriesLeft -= skippr.skipped;
763 val = new Value(valbs.toArray());
764
765
766 rk = skippr.rk;
767 }
768 }
769
770 hasTop = rk != null && !range.afterEndKey(rk.getKey());
771
772 while (hasTop() && range.beforeStartKey(getTopKey())) {
773 next();
774 }
775 }
776
777 @Override
778 public Key getFirstKey() throws IOException {
779 return firstKey;
780 }
781
782 @Override
783 public Key getLastKey() throws IOException {
784 if (index.size() == 0)
785 return null;
786 return index.getLastKey();
787 }
788
789 @Override
790 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
791 throw new UnsupportedOperationException();
792 }
793
794 @Override
795 public void closeDeepCopies() throws IOException {
796 throw new UnsupportedOperationException();
797 }
798
799 @Override
800 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
801 throw new UnsupportedOperationException();
802 }
803
804 @Override
805 public DataInputStream getMetaStore(String name) throws IOException {
806 throw new UnsupportedOperationException();
807 }
808
809 @Override
810 public void setInterruptFlag(AtomicBoolean flag) {
811 this.interruptFlag = flag;
812 }
813 }
814
815 public static class Reader extends HeapIterator implements FileSKVIterator {
816
817 private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
818
819 private BlockFileReader reader;
820
821 private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
822
823 private LocalityGroupReader lgReaders[];
824 private HashSet<ByteSequence> nonDefaultColumnFamilies;
825
826 private List<Reader> deepCopies;
827 private boolean deepCopy = false;
828
829 private AtomicBoolean interruptFlag;
830
831 public Reader(BlockFileReader rdr) throws IOException {
832 this.reader = rdr;
833
834 ABlockReader mb = reader.getMetaBlock("RFile.index");
835
836 int magic = mb.readInt();
837 int ver = mb.readInt();
838
839 if (magic != RINDEX_MAGIC)
840 throw new IOException("Did not see expected magic number, saw " + magic);
841 if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
842 throw new IOException("Did not see expected version, saw " + ver);
843
844 int size = mb.readInt();
845 lgReaders = new LocalityGroupReader[size];
846
847 deepCopies = new LinkedList<Reader>();
848
849 for (int i = 0; i < size; i++) {
850 LocalityGroupMetadata lgm = new LocalityGroupMetadata(ver, rdr);
851 lgm.readFields(mb);
852 localityGroups.add(lgm);
853
854 lgReaders[i] = new LocalityGroupReader(reader, lgm, ver);
855 }
856
857 mb.close();
858
859 nonDefaultColumnFamilies = new HashSet<ByteSequence>();
860 for (LocalityGroupMetadata lgm : localityGroups) {
861 if (!lgm.isDefaultLG)
862 nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
863 }
864
865 createHeap(lgReaders.length);
866 }
867
868 private Reader(Reader r) {
869 super(r.lgReaders.length);
870 this.reader = r.reader;
871 this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
872 this.lgReaders = new LocalityGroupReader[r.lgReaders.length];
873 this.deepCopies = r.deepCopies;
874 this.deepCopy = true;
875 for (int i = 0; i < lgReaders.length; i++) {
876 this.lgReaders[i] = new LocalityGroupReader(r.lgReaders[i]);
877 this.lgReaders[i].setInterruptFlag(r.interruptFlag);
878 }
879 }
880
881 private void closeLocalityGroupReaders() {
882 for (LocalityGroupReader lgr : lgReaders) {
883 try {
884 lgr.close();
885 } catch (IOException e) {
886 e.printStackTrace();
887 }
888 }
889 }
890
891 @Override
892 public void closeDeepCopies() {
893 if (deepCopy)
894 throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported");
895
896 for (Reader deepCopy : deepCopies)
897 deepCopy.closeLocalityGroupReaders();
898
899 deepCopies.clear();
900 }
901
902 @Override
903 public void close() throws IOException {
904 if (deepCopy)
905 throw new RuntimeException("Calling close on a deep copy is not supported");
906
907 closeDeepCopies();
908 closeLocalityGroupReaders();
909
910 try {
911 reader.close();
912 } finally {
913 /**
914 * input Stream is passed to CachableBlockFile and closed there
915 */
916 }
917 }
918
919 @Override
920 public Key getFirstKey() throws IOException {
921 if (lgReaders.length == 0) {
922 return null;
923 }
924
925 Key minKey = null;
926
927 for (int i = 0; i < lgReaders.length; i++) {
928 if (minKey == null) {
929 minKey = lgReaders[i].getFirstKey();
930 } else {
931 Key firstKey = lgReaders[i].getFirstKey();
932 if (firstKey != null && firstKey.compareTo(minKey) < 0)
933 minKey = firstKey;
934 }
935 }
936
937 return minKey;
938 }
939
940 @Override
941 public Key getLastKey() throws IOException {
942 if (lgReaders.length == 0) {
943 return null;
944 }
945
946 Key maxKey = null;
947
948 for (int i = 0; i < lgReaders.length; i++) {
949 if (maxKey == null) {
950 maxKey = lgReaders[i].getLastKey();
951 } else {
952 Key lastKey = lgReaders[i].getLastKey();
953 if (lastKey != null && lastKey.compareTo(maxKey) > 0)
954 maxKey = lastKey;
955 }
956 }
957
958 return maxKey;
959 }
960
961 @Override
962 public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
963 try {
964 return this.reader.getMetaBlock(name).getStream();
965 } catch (MetaBlockDoesNotExist e) {
966 throw new NoSuchMetaStoreException("name = " + name, e);
967 }
968 }
969
970 @Override
971 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
972 Reader copy = new Reader(this);
973 copy.setInterruptFlagInternal(interruptFlag);
974 deepCopies.add(copy);
975 return copy;
976 }
977
978 @Override
979 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
980 throw new UnsupportedOperationException();
981
982 }
983
984 private int numLGSeeked = 0;
985
986 @Override
987 public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
988
989 clear();
990
991 numLGSeeked = 0;
992
993 Set<ByteSequence> cfSet;
994 if (columnFamilies.size() > 0)
995 if (columnFamilies instanceof Set<?>) {
996 cfSet = (Set<ByteSequence>) columnFamilies;
997 } else {
998 cfSet = new HashSet<ByteSequence>();
999 cfSet.addAll(columnFamilies);
1000 }
1001 else
1002 cfSet = Collections.emptySet();
1003
1004 for (LocalityGroupReader lgr : lgReaders) {
1005
1006
1007
1008 boolean include = false;
1009
1010 if (cfSet.size() == 0) {
1011 include = !inclusive;
1012 } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
1013
1014
1015
1016 if (inclusive) {
1017 if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
1018
1019 include = true;
1020 }
1021 } else {
1022
1023
1024 include = true;
1025 }
1026 } else {
1027
1028
1029
1030
1031
1032 for (Entry<ByteSequence,Count> entry : lgr.columnFamilies.entrySet())
1033 if (entry.getValue().count > 0)
1034 if (cfSet.contains(entry.getKey())) {
1035 if (inclusive)
1036 include = true;
1037 } else if (!inclusive) {
1038 include = true;
1039 }
1040 }
1041
1042 if (include) {
1043 lgr.seek(range, EMPTY_CF_SET, false);
1044 addSource(lgr);
1045 numLGSeeked++;
1046 }
1047 }
1048 }
1049
1050 int getNumLocalityGroupsSeeked() {
1051 return numLGSeeked;
1052 }
1053
1054 public FileSKVIterator getIndex() throws IOException {
1055
1056 ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
1057
1058 for (LocalityGroupReader lgr : lgReaders) {
1059 indexes.add(lgr.getIndex());
1060 }
1061
1062 return new MultiIndexIterator(this, indexes);
1063 }
1064
1065 public void printInfo() throws IOException {
1066 for (LocalityGroupMetadata lgm : localityGroups) {
1067 lgm.printInfo();
1068 }
1069
1070 }
1071
1072 @Override
1073 public void setInterruptFlag(AtomicBoolean flag) {
1074 if (deepCopy)
1075 throw new RuntimeException("Calling setInterruptFlag on a deep copy is not supported");
1076
1077 if (deepCopies.size() != 0)
1078 throw new RuntimeException("Setting interrupt flag after calling deep copy not supported");
1079
1080 setInterruptFlagInternal(flag);
1081 }
1082
1083 private void setInterruptFlagInternal(AtomicBoolean flag) {
1084 this.interruptFlag = flag;
1085 for (LocalityGroupReader lgr : lgReaders) {
1086 lgr.setInterruptFlag(interruptFlag);
1087 }
1088 }
1089 }
1090 }