View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.file.rfile;
18  
19  import java.io.DataInput;
20  import java.io.DataInputStream;
21  import java.io.DataOutput;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.io.PrintStream;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.Set;
36  import java.util.TreeMap;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.accumulo.core.conf.AccumuloConfiguration;
40  import org.apache.accumulo.core.conf.Property;
41  import org.apache.accumulo.core.data.ArrayByteSequence;
42  import org.apache.accumulo.core.data.ByteSequence;
43  import org.apache.accumulo.core.data.Key;
44  import org.apache.accumulo.core.data.Range;
45  import org.apache.accumulo.core.data.Value;
46  import org.apache.accumulo.core.file.FileSKVIterator;
47  import org.apache.accumulo.core.file.FileSKVWriter;
48  import org.apache.accumulo.core.file.NoSuchMetaStoreException;
49  import org.apache.accumulo.core.file.blockfile.ABlockReader;
50  import org.apache.accumulo.core.file.blockfile.ABlockWriter;
51  import org.apache.accumulo.core.file.blockfile.BlockFileReader;
52  import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
53  import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
54  import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
55  import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
56  import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
57  import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
58  import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
59  import org.apache.accumulo.core.iterators.IterationInterruptedException;
60  import org.apache.accumulo.core.iterators.IteratorEnvironment;
61  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
62  import org.apache.accumulo.core.iterators.system.HeapIterator;
63  import org.apache.hadoop.io.Writable;
64  import org.apache.log4j.Logger;
65  
66  public class RFile {
67    
68    public static final String EXTENSION = "rf";
69    
70    private static final Logger log = Logger.getLogger(RFile.class);
71    
72    private RFile() {}
73    
74    private static final int RINDEX_MAGIC = 0x20637474;
75    static final int RINDEX_VER_7 = 7;
76    static final int RINDEX_VER_6 = 6;
77    // static final int RINDEX_VER_5 = 5; // unreleased
78    static final int RINDEX_VER_4 = 4;
79    static final int RINDEX_VER_3 = 3;
80    
81    private static class Count {
82      public Count(int i) {
83        this.count = i;
84      }
85      
86      public Count(long count) {
87        this.count = count;
88      }
89      
90      long count;
91    }
92    
93    private static class LocalityGroupMetadata implements Writable {
94      
95      private int startBlock;
96      private Key firstKey;
97      private Map<ByteSequence,Count> columnFamilies;
98      
99      private boolean isDefaultLG = false;
100     private String name;
101     private Set<ByteSequence> previousColumnFamilies;
102     
103     private MultiLevelIndex.BufferedWriter indexWriter;
104     private MultiLevelIndex.Reader indexReader;
105     
106     public LocalityGroupMetadata(int version, BlockFileReader br) {
107       columnFamilies = new HashMap<ByteSequence,Count>();
108       indexReader = new MultiLevelIndex.Reader(br, version);
109     }
110     
111     public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
112       this.startBlock = nextBlock;
113       isDefaultLG = true;
114       columnFamilies = new HashMap<ByteSequence,Count>();
115       previousColumnFamilies = pcf;
116       
117       indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
118     }
119     
120     public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
121       this.startBlock = nextBlock;
122       this.name = name;
123       isDefaultLG = false;
124       columnFamilies = new HashMap<ByteSequence,Count>();
125       for (ByteSequence cf : cfset) {
126         columnFamilies.put(cf, new Count(0));
127       }
128       
129       indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
130     }
131     
132     private Key getFirstKey() {
133       return firstKey;
134     }
135     
136     private void setFirstKey(Key key) {
137       if (firstKey != null)
138         throw new IllegalStateException();
139       this.firstKey = new Key(key);
140     }
141     
142     public void updateColumnCount(Key key) {
143       
144       if (isDefaultLG && columnFamilies == null) {
145         if (previousColumnFamilies.size() > 0) {
146           // only do this check when there are previous column families
147           ByteSequence cf = key.getColumnFamilyData();
148           if (previousColumnFamilies.contains(cf)) {
149             throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
150           }
151         }
152         
153         // no longer keeping track of column families, so return
154         return;
155       }
156       
157       ByteSequence cf = key.getColumnFamilyData();
158       Count count = columnFamilies.get(cf);
159       
160       if (count == null) {
161         if (!isDefaultLG) {
162           throw new IllegalArgumentException("invalid column family : " + cf);
163         }
164         
165         if (previousColumnFamilies.contains(cf)) {
166           throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
167         }
168         
169         if (columnFamilies.size() > Writer.MAX_CF_IN_DLG) {
170           // stop keeping track, there are too many
171           columnFamilies = null;
172           return;
173         }
174         count = new Count(0);
175         columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
176         
177       }
178       
179       count.count++;
180       
181     }
182     
183     @Override
184     public void readFields(DataInput in) throws IOException {
185       
186       isDefaultLG = in.readBoolean();
187       if (!isDefaultLG) {
188         name = in.readUTF();
189       }
190       
191       startBlock = in.readInt();
192       
193       int size = in.readInt();
194       
195       if (size == -1) {
196         if (!isDefaultLG)
197           throw new IllegalStateException("Non default LG " + name + " does not have column families");
198         
199         columnFamilies = null;
200       } else {
201         if (columnFamilies == null)
202           columnFamilies = new HashMap<ByteSequence,Count>();
203         else
204           columnFamilies.clear();
205         
206         for (int i = 0; i < size; i++) {
207           int len = in.readInt();
208           byte cf[] = new byte[len];
209           in.readFully(cf);
210           long count = in.readLong();
211           
212           columnFamilies.put(new ArrayByteSequence(cf), new Count(count));
213         }
214       }
215       
216       if (in.readBoolean()) {
217         firstKey = new Key();
218         firstKey.readFields(in);
219       } else {
220         firstKey = null;
221       }
222       
223       indexReader.readFields(in);
224     }
225     
226     @Override
227     public void write(DataOutput out) throws IOException {
228       
229       out.writeBoolean(isDefaultLG);
230       if (!isDefaultLG) {
231         out.writeUTF(name);
232       }
233       
234       out.writeInt(startBlock);
235       
236       if (isDefaultLG && columnFamilies == null) {
237         // only expect null when default LG, otherwise let a NPE occur
238         out.writeInt(-1);
239       } else {
240         out.writeInt(columnFamilies.size());
241         
242         for (Entry<ByteSequence,Count> entry : columnFamilies.entrySet()) {
243           out.writeInt(entry.getKey().length());
244           out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
245           out.writeLong(entry.getValue().count);
246         }
247       }
248       
249       out.writeBoolean(firstKey != null);
250       if (firstKey != null)
251         firstKey.write(out);
252       
253       indexWriter.close(out);
254     }
255     
256     public void printInfo() throws IOException {
257       PrintStream out = System.out;
258       out.println("Locality group         : " + (isDefaultLG ? "<DEFAULT>" : name));
259       out.println("\tStart block          : " + startBlock);
260       out.println("\tNum   blocks         : " + String.format("%,d", indexReader.size()));
261       TreeMap<Integer,Long> sizesByLevel = new TreeMap<Integer,Long>();
262       TreeMap<Integer,Long> countsByLevel = new TreeMap<Integer,Long>();
263       indexReader.getIndexInfo(sizesByLevel, countsByLevel);
264       for (Entry<Integer,Long> entry : sizesByLevel.descendingMap().entrySet()) {
265         out.println("\tIndex level " + entry.getKey() + "        : "
266             + String.format("%,d bytes  %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
267       }
268       out.println("\tFirst key            : " + firstKey);
269       
270       Key lastKey = null;
271       if (indexReader != null && indexReader.size() > 0) {
272         lastKey = indexReader.getLastKey();
273       }
274       
275       out.println("\tLast key             : " + lastKey);
276       
277       long numKeys = 0;
278       IndexIterator countIter = indexReader.lookup(new Key());
279       while (countIter.hasNext()) {
280         numKeys += countIter.next().getNumEntries();
281       }
282       
283       out.println("\tNum entries          : " + String.format("%,d", numKeys));
284       out.println("\tColumn families      : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
285     }
286     
287   }
288   
289   public static class Writer implements FileSKVWriter {
290     
291     public static final int MAX_CF_IN_DLG = 1000;
292     
293     private BlockFileWriter fileWriter;
294     private ABlockWriter blockWriter;
295     
296     // private BlockAppender blockAppender;
297     private long blockSize = 100000;
298     private int indexBlockSize;
299     private int entries = 0;
300     
301     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
302     private LocalityGroupMetadata currentLocalityGroup = null;
303     private int nextBlock = 0;
304     
305     private Key lastKeyInBlock = null;
306     
307     private boolean dataClosed = false;
308     private boolean closed = false;
309     private Key prevKey = new Key();
310     private boolean startedDefaultLocalityGroup = false;
311     
312     private HashSet<ByteSequence> previousColumnFamilies;
313     
314     public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
315       this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
316     }
317     
318     public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
319       this.blockSize = blockSize;
320       this.indexBlockSize = indexBlockSize;
321       this.fileWriter = bfw;
322       this.blockWriter = null;
323       previousColumnFamilies = new HashSet<ByteSequence>();
324     }
325     
326     @Override
327     public synchronized void close() throws IOException {
328       
329       if (closed) {
330         return;
331       }
332       
333       closeData();
334       
335       ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
336       
337       mba.writeInt(RINDEX_MAGIC);
338       mba.writeInt(RINDEX_VER_7);
339       
340       if (currentLocalityGroup != null)
341         localityGroups.add(currentLocalityGroup);
342       
343       mba.writeInt(localityGroups.size());
344       
345       for (LocalityGroupMetadata lc : localityGroups) {
346         lc.write(mba);
347       }
348       
349       mba.close();
350       
351       fileWriter.close();
352       
353       closed = true;
354     }
355     
356     private void closeData() throws IOException {
357       
358       if (dataClosed) {
359         return;
360       }
361       
362       dataClosed = true;
363       
364       if (blockWriter != null) {
365         closeBlock(lastKeyInBlock, true);
366       }
367     }
368     
369     @Override
370     public void append(Key key, Value value) throws IOException {
371       
372       if (dataClosed) {
373         throw new IllegalStateException("Cannont append, data closed");
374       }
375       
376       if (key.compareTo(prevKey) < 0) {
377         throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
378       }
379       
380       currentLocalityGroup.updateColumnCount(key);
381       
382       if (currentLocalityGroup.getFirstKey() == null) {
383         currentLocalityGroup.setFirstKey(key);
384       }
385       
386       if (blockWriter == null) {
387         blockWriter = fileWriter.prepareDataBlock();
388       } else if (blockWriter.getRawSize() > blockSize) {
389         closeBlock(prevKey, false);
390         blockWriter = fileWriter.prepareDataBlock();
391       }
392       
393       RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
394       
395       rk.write(blockWriter);
396       value.write(blockWriter);
397       entries++;
398       
399       prevKey = new Key(key);
400       lastKeyInBlock = prevKey;
401       
402     }
403     
404     private void closeBlock(Key key, boolean lastBlock) throws IOException {
405       blockWriter.close();
406       
407       if (lastBlock)
408         currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
409       else
410         currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
411       
412       blockWriter = null;
413       lastKeyInBlock = null;
414       entries = 0;
415       nextBlock++;
416     }
417     
418     @Override
419     public DataOutputStream createMetaStore(String name) throws IOException {
420       closeData();
421       
422       return (DataOutputStream) fileWriter.prepareMetaBlock(name);
423     }
424     
425     private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
426       if (dataClosed) {
427         throw new IllegalStateException("data closed");
428       }
429       
430       if (startedDefaultLocalityGroup) {
431         throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
432       }
433       
434       if (blockWriter != null) {
435         closeBlock(lastKeyInBlock, true);
436       }
437       
438       if (currentLocalityGroup != null) {
439         localityGroups.add(currentLocalityGroup);
440       }
441       
442       if (columnFamilies == null) {
443         startedDefaultLocalityGroup = true;
444         currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
445       } else {
446         if (!Collections.disjoint(columnFamilies, previousColumnFamilies)) {
447           HashSet<ByteSequence> overlap = new HashSet<ByteSequence>(columnFamilies);
448           overlap.retainAll(previousColumnFamilies);
449           throw new IllegalArgumentException("Column families over lap with previous locality group : " + overlap);
450         }
451         currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
452         previousColumnFamilies.addAll(columnFamilies);
453       }
454       
455       prevKey = new Key();
456     }
457     
458     @Override
459     public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
460       if (columnFamilies == null)
461         throw new NullPointerException();
462       
463       _startNewLocalityGroup(name, columnFamilies);
464     }
465     
466     @Override
467     public void startDefaultLocalityGroup() throws IOException {
468       _startNewLocalityGroup(null, null);
469     }
470     
471     @Override
472     public boolean supportsLocalityGroups() {
473       return true;
474     }
475   }
476   
477   private static class LocalityGroupReader implements FileSKVIterator {
478     
479     private BlockFileReader reader;
480     private MultiLevelIndex.Reader index;
481     private int blockCount;
482     private Key firstKey;
483     private int startBlock;
484     private Map<ByteSequence,Count> columnFamilies;
485     private boolean isDefaultLocalityGroup;
486     private boolean closed = false;
487     private int version;
488     private boolean checkRange = true;
489     
490     private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
491       this.firstKey = lgm.firstKey;
492       this.index = lgm.indexReader;
493       this.startBlock = lgm.startBlock;
494       blockCount = index.size();
495       this.columnFamilies = lgm.columnFamilies;
496       this.isDefaultLocalityGroup = lgm.isDefaultLG;
497       this.version = version;
498       
499       this.reader = reader;
500       
501     }
502     
503     public LocalityGroupReader(LocalityGroupReader lgr) {
504       this.firstKey = lgr.firstKey;
505       this.index = lgr.index;
506       this.startBlock = lgr.startBlock;
507       this.blockCount = lgr.blockCount;
508       this.columnFamilies = lgr.columnFamilies;
509       this.isDefaultLocalityGroup = lgr.isDefaultLocalityGroup;
510       this.reader = lgr.reader;
511       this.version = lgr.version;
512     }
513     
514     Iterator<IndexEntry> getIndex() throws IOException {
515       return index.lookup(new Key());
516     }
517     
518     @Override
519     public void close() throws IOException {
520       closed = true;
521       hasTop = false;
522       if (currBlock != null)
523         currBlock.close();
524       
525     }
526     
527     private IndexIterator iiter;
528     private int entriesLeft;
529     private ABlockReader currBlock;
530     private RelativeKey rk;
531     private Value val;
532     private Key prevKey = null;
533     private Range range = null;
534     private boolean hasTop = false;
535     private AtomicBoolean interruptFlag;
536     
537     @Override
538     public Key getTopKey() {
539       return rk.getKey();
540     }
541     
542     @Override
543     public Value getTopValue() {
544       return val;
545     }
546     
547     @Override
548     public boolean hasTop() {
549       return hasTop;
550     }
551     
552     @Override
553     public void next() throws IOException {
554       try {
555         _next();
556       } catch (IOException ioe) {
557         reset();
558         throw ioe;
559       }
560     }
561     
562     private void _next() throws IOException {
563       
564       if (!hasTop)
565         throw new IllegalStateException();
566       
567       if (entriesLeft == 0) {
568         currBlock.close();
569         
570         if (iiter.hasNext()) {
571           IndexEntry indexEntry = iiter.next();
572           entriesLeft = indexEntry.getNumEntries();
573           currBlock = getDataBlock(indexEntry);
574           
575           checkRange = range.afterEndKey(indexEntry.getKey());
576           if (!checkRange)
577             hasTop = true;
578 
579         } else {
580           rk = null;
581           val = null;
582           hasTop = false;
583           return;
584         }
585       }
586       
587       prevKey = rk.getKey();
588       rk.readFields(currBlock);
589       val.readFields(currBlock);
590       entriesLeft--;
591       if (checkRange)
592         hasTop = !range.afterEndKey(rk.getKey());
593     }
594     
595     private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException {
596       if (interruptFlag != null && interruptFlag.get())
597         throw new IterationInterruptedException();
598       
599       if (version == RINDEX_VER_3 || version == RINDEX_VER_4)
600         return reader.getDataBlock(startBlock + iiter.previousIndex());
601       else
602         return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(), indexEntry.getRawSize());
603       
604     }
605     
606     @Override
607     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
608       
609       if (closed)
610         throw new IllegalStateException("Locality group reader closed");
611       
612       if (columnFamilies.size() != 0 || inclusive)
613         throw new IllegalArgumentException("I do not know how to filter column families");
614       
615       if (interruptFlag != null && interruptFlag.get())
616         throw new IterationInterruptedException();
617       
618       try {
619         _seek(range);
620       } catch (IOException ioe) {
621         reset();
622         throw ioe;
623       }
624     }
625     
626     private void reset() {
627       rk = null;
628       hasTop = false;
629       if (currBlock != null) {
630         try {
631           try {
632             currBlock.close();
633           } catch (IOException e) {
634             log.warn("Failed to close block reader", e);
635           }
636         } finally {
637           currBlock = null;
638         }
639       }
640     }
641     
642     private void _seek(Range range) throws IOException {
643       
644       this.range = range;
645       this.checkRange = true;
646       
647       if (blockCount == 0) {
648         // its an empty file
649         rk = null;
650         return;
651       }
652       
653       Key startKey = range.getStartKey();
654       if (startKey == null)
655         startKey = new Key();
656       
657       boolean reseek = true;
658       
659       if (range.afterEndKey(firstKey)) {
660         // range is before first key in rfile, so there is nothing to do
661         reset();
662         reseek = false;
663       }
664       
665       if (rk != null) {
666         if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
667           // range is between the two keys in the file where the last range seeked to stopped, so there is
668           // nothing to do
669           reseek = false;
670         }
671         
672         if (startKey.compareTo(getTopKey()) <= 0 && startKey.compareTo(prevKey) > 0) {
673           // current location in file can satisfy this request, no need to seek
674           reseek = false;
675         }
676         
677         if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
678           // start key is within the unconsumed portion of the current block
679           
680           // this code intentionally does not use the index associated with a cached block
681           // because if only forward seeks are being done, then there is no benefit to building
682           // and index for the block... could consider using the index if it exist but not
683           // causing the build of an index... doing this could slow down some use cases and
684           // and speed up others.
685 
686           MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
687           SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
688           if (skippr.skipped > 0) {
689             entriesLeft -= skippr.skipped;
690             val = new Value(valbs.toArray());
691             prevKey = skippr.prevKey;
692             rk = skippr.rk;
693           }
694           
695           reseek = false;
696         }
697         
698         if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) {
699           // seeking before the beginning of the file, and already positioned at the first key in the file
700           // so there is nothing to do
701           reseek = false;
702         }
703       }
704       
705       if (reseek) {
706         iiter = index.lookup(startKey);
707         
708         reset();
709         
710         if (!iiter.hasNext()) {
711           // past the last key
712         } else {
713           
714           // if the index contains the same key multiple times, then go to the
715           // earliest index entry containing the key
716           while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
717             iiter.previous();
718           }
719           
720           if (iiter.hasPrevious())
721             prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block
722           else
723             prevKey = new Key(); // first block in the file, so set prev key to minimal key
724             
725           IndexEntry indexEntry = iiter.next();
726           entriesLeft = indexEntry.getNumEntries();
727           currBlock = getDataBlock(indexEntry);
728 
729           checkRange = range.afterEndKey(indexEntry.getKey());
730           if (!checkRange)
731             hasTop = true;
732 
733           MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
734 
735           Key currKey = null;
736 
737           if (currBlock.isIndexable()) {
738             BlockIndex blockIndex = BlockIndex.getIndex(currBlock, indexEntry);
739             if (blockIndex != null) {
740               BlockIndexEntry bie = blockIndex.seekBlock(startKey, currBlock);
741               if (bie != null) {
742                 // we are seeked to the current position of the key in the index
743                 // need to prime the read process and read this key from the block
744                 RelativeKey tmpRk = new RelativeKey();
745                 tmpRk.setPrevKey(bie.getPrevKey());
746                 tmpRk.readFields(currBlock);
747                 val = new Value();
748 
749                 val.readFields(currBlock);
750                 valbs = new MByteSequence(val.get(), 0, val.getSize());
751                 
752                 // just consumed one key from the input stream, so subtract one from entries left
753                 entriesLeft = bie.getEntriesLeft() - 1;
754                 prevKey = new Key(bie.getPrevKey());
755                 currKey = tmpRk.getKey();
756               }
757             }
758           }
759 
760           SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
761           prevKey = skippr.prevKey;
762           entriesLeft -= skippr.skipped;
763           val = new Value(valbs.toArray());
764           // set rk when everything above is successful, if exception
765           // occurs rk will not be set
766           rk = skippr.rk;
767         }
768       }
769       
770       hasTop = rk != null && !range.afterEndKey(rk.getKey());
771       
772       while (hasTop() && range.beforeStartKey(getTopKey())) {
773         next();
774       }
775     }
776     
777     @Override
778     public Key getFirstKey() throws IOException {
779       return firstKey;
780     }
781     
782     @Override
783     public Key getLastKey() throws IOException {
784       if (index.size() == 0)
785         return null;
786       return index.getLastKey();
787     }
788     
789     @Override
790     public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
791       throw new UnsupportedOperationException();
792     }
793     
794     @Override
795     public void closeDeepCopies() throws IOException {
796       throw new UnsupportedOperationException();
797     }
798     
799     @Override
800     public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
801       throw new UnsupportedOperationException();
802     }
803     
804     @Override
805     public DataInputStream getMetaStore(String name) throws IOException {
806       throw new UnsupportedOperationException();
807     }
808     
809     @Override
810     public void setInterruptFlag(AtomicBoolean flag) {
811       this.interruptFlag = flag;
812     }
813   }
814   
815   public static class Reader extends HeapIterator implements FileSKVIterator {
816     
817     private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
818     
819     private BlockFileReader reader;
820     
821     private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
822     
823     private LocalityGroupReader lgReaders[];
824     private HashSet<ByteSequence> nonDefaultColumnFamilies;
825     
826     private List<Reader> deepCopies;
827     private boolean deepCopy = false;
828     
829     private AtomicBoolean interruptFlag;
830     
831     public Reader(BlockFileReader rdr) throws IOException {
832       this.reader = rdr;
833       
834       ABlockReader mb = reader.getMetaBlock("RFile.index");
835       
836       int magic = mb.readInt();
837       int ver = mb.readInt();
838       
839       if (magic != RINDEX_MAGIC)
840         throw new IOException("Did not see expected magic number, saw " + magic);
841       if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
842         throw new IOException("Did not see expected version, saw " + ver);
843       
844       int size = mb.readInt();
845       lgReaders = new LocalityGroupReader[size];
846       
847       deepCopies = new LinkedList<Reader>();
848       
849       for (int i = 0; i < size; i++) {
850         LocalityGroupMetadata lgm = new LocalityGroupMetadata(ver, rdr);
851         lgm.readFields(mb);
852         localityGroups.add(lgm);
853         
854         lgReaders[i] = new LocalityGroupReader(reader, lgm, ver);
855       }
856       
857       mb.close();
858       
859       nonDefaultColumnFamilies = new HashSet<ByteSequence>();
860       for (LocalityGroupMetadata lgm : localityGroups) {
861         if (!lgm.isDefaultLG)
862           nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
863       }
864       
865       createHeap(lgReaders.length);
866     }
867     
868     private Reader(Reader r) {
869       super(r.lgReaders.length);
870       this.reader = r.reader;
871       this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
872       this.lgReaders = new LocalityGroupReader[r.lgReaders.length];
873       this.deepCopies = r.deepCopies;
874       this.deepCopy = true;
875       for (int i = 0; i < lgReaders.length; i++) {
876         this.lgReaders[i] = new LocalityGroupReader(r.lgReaders[i]);
877         this.lgReaders[i].setInterruptFlag(r.interruptFlag);
878       }
879     }
880     
881     private void closeLocalityGroupReaders() {
882       for (LocalityGroupReader lgr : lgReaders) {
883         try {
884           lgr.close();
885         } catch (IOException e) {
886           e.printStackTrace();
887         }
888       }
889     }
890     
891     @Override
892     public void closeDeepCopies() {
893       if (deepCopy)
894         throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported");
895       
896       for (Reader deepCopy : deepCopies)
897         deepCopy.closeLocalityGroupReaders();
898       
899       deepCopies.clear();
900     }
901     
902     @Override
903     public void close() throws IOException {
904       if (deepCopy)
905         throw new RuntimeException("Calling close on a deep copy is not supported");
906       
907       closeDeepCopies();
908       closeLocalityGroupReaders();
909       
910       try {
911         reader.close();
912       } finally {
913         /**
914          * input Stream is passed to CachableBlockFile and closed there
915          */
916       }
917     }
918     
919     @Override
920     public Key getFirstKey() throws IOException {
921       if (lgReaders.length == 0) {
922         return null;
923       }
924       
925       Key minKey = null;
926       
927       for (int i = 0; i < lgReaders.length; i++) {
928         if (minKey == null) {
929           minKey = lgReaders[i].getFirstKey();
930         } else {
931           Key firstKey = lgReaders[i].getFirstKey();
932           if (firstKey != null && firstKey.compareTo(minKey) < 0)
933             minKey = firstKey;
934         }
935       }
936       
937       return minKey;
938     }
939     
940     @Override
941     public Key getLastKey() throws IOException {
942       if (lgReaders.length == 0) {
943         return null;
944       }
945       
946       Key maxKey = null;
947       
948       for (int i = 0; i < lgReaders.length; i++) {
949         if (maxKey == null) {
950           maxKey = lgReaders[i].getLastKey();
951         } else {
952           Key lastKey = lgReaders[i].getLastKey();
953           if (lastKey != null && lastKey.compareTo(maxKey) > 0)
954             maxKey = lastKey;
955         }
956       }
957       
958       return maxKey;
959     }
960     
961     @Override
962     public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
963       try {
964         return this.reader.getMetaBlock(name).getStream();
965       } catch (MetaBlockDoesNotExist e) {
966         throw new NoSuchMetaStoreException("name = " + name, e);
967       }
968     }
969     
970     @Override
971     public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
972       Reader copy = new Reader(this);
973       copy.setInterruptFlagInternal(interruptFlag);
974       deepCopies.add(copy);
975       return copy;
976     }
977     
978     @Override
979     public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
980       throw new UnsupportedOperationException();
981       
982     }
983     
984     private int numLGSeeked = 0;
985     
986     @Override
987     public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
988       
989       clear();
990       
991       numLGSeeked = 0;
992       
993       Set<ByteSequence> cfSet;
994       if (columnFamilies.size() > 0)
995         if (columnFamilies instanceof Set<?>) {
996           cfSet = (Set<ByteSequence>) columnFamilies;
997         } else {
998           cfSet = new HashSet<ByteSequence>();
999           cfSet.addAll(columnFamilies);
1000         }
1001       else
1002         cfSet = Collections.emptySet();
1003       
1004       for (LocalityGroupReader lgr : lgReaders) {
1005         
1006         // when include is set to true it means this locality groups contains
1007         // wanted column families
1008         boolean include = false;
1009         
1010         if (cfSet.size() == 0) {
1011           include = !inclusive;
1012         } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
1013           // do not know what column families are in the default locality group,
1014           // only know what column families are not in it
1015           
1016           if (inclusive) {
1017             if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
1018               // default LG may contain wanted and unwanted column families
1019               include = true;
1020             }// else - everything wanted is in other locality groups, so nothing to do
1021           } else {
1022             // must include, if all excluded column families are in other locality groups
1023             // then there are not unwanted column families in default LG
1024             include = true;
1025           }
1026         } else {
1027           /*
1028            * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
1029            * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
1030            */
1031           
1032           for (Entry<ByteSequence,Count> entry : lgr.columnFamilies.entrySet())
1033             if (entry.getValue().count > 0)
1034               if (cfSet.contains(entry.getKey())) {
1035                 if (inclusive)
1036                   include = true;
1037               } else if (!inclusive) {
1038                 include = true;
1039               }
1040         }
1041         
1042         if (include) {
1043           lgr.seek(range, EMPTY_CF_SET, false);
1044           addSource(lgr);
1045           numLGSeeked++;
1046         }// every column family is excluded, zero count, or not present
1047       }
1048     }
1049     
1050     int getNumLocalityGroupsSeeked() {
1051       return numLGSeeked;
1052     }
1053     
1054     public FileSKVIterator getIndex() throws IOException {
1055       
1056       ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
1057       
1058       for (LocalityGroupReader lgr : lgReaders) {
1059         indexes.add(lgr.getIndex());
1060       }
1061       
1062       return new MultiIndexIterator(this, indexes);
1063     }
1064     
1065     public void printInfo() throws IOException {
1066       for (LocalityGroupMetadata lgm : localityGroups) {
1067         lgm.printInfo();
1068       }
1069       
1070     }
1071     
1072     @Override
1073     public void setInterruptFlag(AtomicBoolean flag) {
1074       if (deepCopy)
1075         throw new RuntimeException("Calling setInterruptFlag on a deep copy is not supported");
1076       
1077       if (deepCopies.size() != 0)
1078         throw new RuntimeException("Setting interrupt flag after calling deep copy not supported");
1079       
1080       setInterruptFlagInternal(flag);
1081     }
1082     
1083     private void setInterruptFlagInternal(AtomicBoolean flag) {
1084       this.interruptFlag = flag;
1085       for (LocalityGroupReader lgr : lgReaders) {
1086         lgr.setInterruptFlag(interruptFlag);
1087       }
1088     }
1089   }
1090 }