View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.file.rfile;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInput;
22  import java.io.DataInputStream;
23  import java.io.DataOutput;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.util.AbstractList;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.List;
31  import java.util.ListIterator;
32  import java.util.Map;
33  import java.util.RandomAccess;
34  
35  import org.apache.accumulo.core.data.Key;
36  import org.apache.accumulo.core.file.blockfile.ABlockReader;
37  import org.apache.accumulo.core.file.blockfile.ABlockWriter;
38  import org.apache.accumulo.core.file.blockfile.BlockFileReader;
39  import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
40  import org.apache.accumulo.core.file.rfile.bcfile.Utils;
41  import org.apache.hadoop.io.WritableComparable;
42  
43  public class MultiLevelIndex {
44    
45    public static class IndexEntry implements WritableComparable<IndexEntry> {
46      private Key key;
47      private int entries;
48      private long offset;
49      private long compressedSize;
50      private long rawSize;
51      private boolean newFormat;
52      
53      IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
54        this.key = k;
55        this.entries = e;
56        this.offset = offset;
57        this.compressedSize = compressedSize;
58        this.rawSize = rawSize;
59        newFormat = true;
60      }
61      
62      public IndexEntry(boolean newFormat) {
63        this.newFormat = newFormat;
64      }
65      
66      @Override
67      public void readFields(DataInput in) throws IOException {
68        key = new Key();
69        key.readFields(in);
70        entries = in.readInt();
71        if (newFormat) {
72          offset = Utils.readVLong(in);
73          compressedSize = Utils.readVLong(in);
74          rawSize = Utils.readVLong(in);
75        } else {
76          offset = -1;
77          compressedSize = -1;
78          rawSize = -1;
79        }
80      }
81      
82      @Override
83      public void write(DataOutput out) throws IOException {
84        key.write(out);
85        out.writeInt(entries);
86        if (newFormat) {
87          Utils.writeVLong(out, offset);
88          Utils.writeVLong(out, compressedSize);
89          Utils.writeVLong(out, rawSize);
90        }
91      }
92      
93      public Key getKey() {
94        return key;
95      }
96      
97      public int getNumEntries() {
98        return entries;
99      }
100     
101     public long getOffset() {
102       return offset;
103     }
104     
105     public long getCompressedSize() {
106       return compressedSize;
107     }
108     
109     public long getRawSize() {
110       return rawSize;
111     }
112     
113     @Override
114     public int compareTo(IndexEntry o) {
115       return key.compareTo(o.key);
116     }
117   }
118   
119   // a list that deserializes index entries on demand
120   private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
121     
122     private int[] offsets;
123     private byte[] data;
124     private boolean newFormat;
125     
126     SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
127       this.offsets = offsets;
128       this.data = data;
129       this.newFormat = newFormat;
130     }
131     
132     @Override
133     public IndexEntry get(int index) {
134       int len;
135       if (index == offsets.length - 1)
136         len = data.length - offsets[index];
137       else
138         len = offsets[index + 1] - offsets[index];
139       
140       ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
141       DataInputStream dis = new DataInputStream(bais);
142       
143       IndexEntry ie = new IndexEntry(newFormat);
144       try {
145         ie.readFields(dis);
146       } catch (IOException e) {
147         throw new RuntimeException(e);
148       }
149       
150       return ie;
151     }
152     
153     @Override
154     public int size() {
155       return offsets.length;
156     }
157     
158     public long sizeInBytes() {
159       return data.length + 4 * offsets.length;
160     }
161     
162   }
163   
164   private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess {
165     
166     private int[] offsets;
167     private byte[] data;
168     
169     KeyIndex(int[] offsets, byte[] data) {
170       this.offsets = offsets;
171       this.data = data;
172     }
173     
174     @Override
175     public Key get(int index) {
176       int len;
177       if (index == offsets.length - 1)
178         len = data.length - offsets[index];
179       else
180         len = offsets[index + 1] - offsets[index];
181       
182       ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
183       DataInputStream dis = new DataInputStream(bais);
184       
185       Key key = new Key();
186       try {
187         key.readFields(dis);
188       } catch (IOException e) {
189         throw new RuntimeException(e);
190       }
191       
192       return key;
193     }
194     
195     @Override
196     public int size() {
197       return offsets.length;
198     }
199   }
200   
201   static class IndexBlock {
202     
203     private ByteArrayOutputStream indexBytes;
204     private DataOutputStream indexOut;
205     
206     private ArrayList<Integer> offsets;
207     private int level;
208     private int offset;
209     
210     SerializedIndex index;
211     KeyIndex keyIndex;
212     private boolean hasNext;
213     
214     public IndexBlock(int level, int totalAdded) {
215       // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
216       
217       this.level = level;
218       this.offset = totalAdded;
219       
220       indexBytes = new ByteArrayOutputStream();
221       indexOut = new DataOutputStream(indexBytes);
222       offsets = new ArrayList<Integer>();
223     }
224     
225     public IndexBlock() {}
226     
227     public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
228       offsets.add(indexOut.size());
229       new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
230     }
231     
232     int getSize() {
233       return indexOut.size() + 4 * offsets.size();
234     }
235     
236     public void write(DataOutput out) throws IOException {
237       out.writeInt(level);
238       out.writeInt(offset);
239       out.writeBoolean(hasNext);
240       
241       out.writeInt(offsets.size());
242       for (Integer offset : offsets) {
243         out.writeInt(offset);
244       }
245       
246       indexOut.close();
247       byte[] indexData = indexBytes.toByteArray();
248       
249       out.writeInt(indexData.length);
250       out.write(indexData);
251     }
252     
253     public void readFields(DataInput in, int version) throws IOException {
254       
255       if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
256         level = in.readInt();
257         offset = in.readInt();
258         hasNext = in.readBoolean();
259         
260         int numOffsets = in.readInt();
261         int[] offsets = new int[numOffsets];
262         
263         for (int i = 0; i < numOffsets; i++)
264           offsets[i] = in.readInt();
265         
266         int indexSize = in.readInt();
267         byte[] serializedIndex = new byte[indexSize];
268         in.readFully(serializedIndex);
269         
270         index = new SerializedIndex(offsets, serializedIndex, true);
271         keyIndex = new KeyIndex(offsets, serializedIndex);
272       } else if (version == RFile.RINDEX_VER_3) {
273         level = 0;
274         offset = 0;
275         hasNext = false;
276         
277         int size = in.readInt();
278         
279         ByteArrayOutputStream baos = new ByteArrayOutputStream();
280         DataOutputStream dos = new DataOutputStream(baos);
281         ArrayList<Integer> oal = new ArrayList<Integer>();
282         
283         for (int i = 0; i < size; i++) {
284           IndexEntry ie = new IndexEntry(false);
285           oal.add(dos.size());
286           ie.readFields(in);
287           ie.write(dos);
288         }
289         
290         dos.close();
291         
292         int[] oia = new int[oal.size()];
293         for (int i = 0; i < oal.size(); i++) {
294           oia[i] = oal.get(i);
295         }
296         
297         byte[] serializedIndex = baos.toByteArray();
298         index = new SerializedIndex(oia, serializedIndex, false);
299         keyIndex = new KeyIndex(oia, serializedIndex);
300       } else if (version == RFile.RINDEX_VER_4) {
301         level = 0;
302         offset = 0;
303         hasNext = false;
304         
305         int numIndexEntries = in.readInt();
306         int offsets[] = new int[numIndexEntries];
307         for (int i = 0; i < numIndexEntries; i++) {
308           offsets[i] = in.readInt();
309         }
310         
311         int size = in.readInt();
312         byte[] indexData = new byte[size];
313         in.readFully(indexData);
314         
315         index = new SerializedIndex(offsets, indexData, false);
316         keyIndex = new KeyIndex(offsets, indexData);
317       } else {
318         throw new RuntimeException("Unexpected version " + version);
319       }
320       
321     }
322     
323     List<IndexEntry> getIndex() {
324       return index;
325     }
326     
327     public List<Key> getKeyIndex() {
328       return keyIndex;
329     }
330     
331     int getLevel() {
332       return level;
333     }
334     
335     int getOffset() {
336       return offset;
337     }
338     
339     boolean hasNext() {
340       return hasNext;
341     }
342     
343     void setHasNext(boolean b) {
344       this.hasNext = b;
345     }
346     
347   }
348   
349   /**
350    * this class buffers writes to the index so that chunks of index blocks are contiguous in the file instead of having index blocks sprinkled throughout the
351    * file making scans of the entire index slow.
352    */
353   public static class BufferedWriter {
354     
355     private Writer writer;
356     private DataOutputStream buffer;
357     private int buffered;
358     private ByteArrayOutputStream baos;
359     
360     public BufferedWriter(Writer writer) {
361       this.writer = writer;
362       baos = new ByteArrayOutputStream(1 << 20);
363       buffer = new DataOutputStream(baos);
364       buffered = 0;
365     }
366     
367     private void flush() throws IOException {
368       buffer.close();
369       
370       DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
371       
372       IndexEntry ie = new IndexEntry(true);
373       for (int i = 0; i < buffered; i++) {
374         ie.readFields(dis);
375         writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
376       }
377       
378       buffered = 0;
379       baos = new ByteArrayOutputStream(1 << 20);
380       buffer = new DataOutputStream(baos);
381       
382     }
383     
384     public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
385       if (buffer.size() > (10 * 1 << 20)) {
386         flush();
387       }
388       
389       new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
390       buffered++;
391     }
392     
393     public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
394       flush();
395       writer.addLast(key, data, offset, compressedSize, rawSize);
396     }
397     
398     public void close(DataOutput out) throws IOException {
399       writer.close(out);
400     }
401   }
402 
403   public static class Writer {
404     private int threshold;
405     
406     private ArrayList<IndexBlock> levels;
407     
408     private int totalAdded;
409     
410     private boolean addedLast = false;
411     
412     private BlockFileWriter blockFileWriter;
413     
414     Writer(BlockFileWriter blockFileWriter, int maxBlockSize) {
415       this.blockFileWriter = blockFileWriter;
416       this.threshold = maxBlockSize;
417       levels = new ArrayList<IndexBlock>();
418     }
419     
420     private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
421       if (level == levels.size()) {
422         levels.add(new IndexBlock(level, 0));
423       }
424       
425       IndexBlock iblock = levels.get(level);
426       
427       iblock.add(key, data, offset, compressedSize, rawSize);
428     }
429     
430     private void flush(int level, Key lastKey, boolean last) throws IOException {
431       
432       if (last && level == levels.size() - 1)
433         return;
434       
435       IndexBlock iblock = levels.get(level);
436       if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
437         ABlockWriter out = blockFileWriter.prepareDataBlock();
438         iblock.setHasNext(!last);
439         iblock.write(out);
440         out.close();
441         
442         add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
443         flush(level + 1, lastKey, last);
444         
445         if (last)
446           levels.set(level, null);
447         else
448           levels.set(level, new IndexBlock(level, totalAdded));
449       }
450     }
451     
452     public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
453       totalAdded++;
454       add(0, key, data, offset, compressedSize, rawSize);
455       flush(0, key, false);
456     }
457     
458     public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
459       if (addedLast)
460         throw new IllegalStateException("already added last");
461       
462       totalAdded++;
463       add(0, key, data, offset, compressedSize, rawSize);
464       flush(0, key, true);
465       addedLast = true;
466       
467     }
468     
469     public void close(DataOutput out) throws IOException {
470       if (totalAdded > 0 && !addedLast)
471         throw new IllegalStateException("did not call addLast");
472       
473       out.writeInt(totalAdded);
474       // save root node
475       if (levels.size() > 0) {
476         levels.get(levels.size() - 1).write(out);
477       } else {
478         new IndexBlock(0, 0).write(out);
479       }
480       
481     }
482   }
483   
484   public static class Reader {
485     private IndexBlock rootBlock;
486     private BlockFileReader blockStore;
487     private int version;
488     private int size;
489     
490     public class Node {
491       
492       private Node parent;
493       private IndexBlock indexBlock;
494       private int currentPos;
495       
496       Node(Node parent, IndexBlock iBlock) {
497         this.parent = parent;
498         this.indexBlock = iBlock;
499       }
500       
501       Node(IndexBlock rootInfo) {
502         this.parent = null;
503         this.indexBlock = rootInfo;
504       }
505       
506       private Node lookup(Key key) throws IOException {
507         int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
508           @Override
509           public int compare(Key o1, Key o2) {
510             return o1.compareTo(o2);
511           }
512         });
513         
514         if (pos < 0)
515           pos = (pos * -1) - 1;
516         
517         if (pos == indexBlock.getIndex().size()) {
518           if (parent != null)
519             throw new IllegalStateException();
520           this.currentPos = pos;
521           return this;
522         }
523         
524         this.currentPos = pos;
525         
526         if (indexBlock.getLevel() == 0) {
527           return this;
528         }
529         
530         IndexEntry ie = indexBlock.getIndex().get(pos);
531         Node child = new Node(this, getIndexBlock(ie));
532         return child.lookup(key);
533       }
534       
535       private Node getLast() throws IOException {
536         currentPos = indexBlock.getIndex().size() - 1;
537         if (indexBlock.getLevel() == 0)
538           return this;
539         
540         IndexEntry ie = indexBlock.getIndex().get(currentPos);
541         Node child = new Node(this, getIndexBlock(ie));
542         return child.getLast();
543       }
544       
545       private Node getFirst() throws IOException {
546         currentPos = 0;
547         if (indexBlock.getLevel() == 0)
548           return this;
549         
550         IndexEntry ie = indexBlock.getIndex().get(currentPos);
551         Node child = new Node(this, getIndexBlock(ie));
552         return child.getFirst();
553       }
554       
555       private Node getPrevious() throws IOException {
556         if (currentPos == 0)
557           return parent.getPrevious();
558         
559         currentPos--;
560         
561         IndexEntry ie = indexBlock.getIndex().get(currentPos);
562         Node child = new Node(this, getIndexBlock(ie));
563         return child.getLast();
564         
565       }
566       
567       private Node getNext() throws IOException {
568         if (currentPos == indexBlock.getIndex().size() - 1)
569           return parent.getNext();
570         
571         currentPos++;
572         
573         IndexEntry ie = indexBlock.getIndex().get(currentPos);
574         Node child = new Node(this, getIndexBlock(ie));
575         return child.getFirst();
576         
577       }
578       
579       Node getNextNode() throws IOException {
580         return parent.getNext();
581       }
582       
583       Node getPreviousNode() throws IOException {
584         return parent.getPrevious();
585       }
586     }
587     
588     public class IndexIterator implements ListIterator<IndexEntry> {
589       
590       private Node node;
591       private ListIterator<IndexEntry> liter;
592       
593       private Node getPrevNode() {
594         try {
595           return node.getPreviousNode();
596         } catch (IOException e) {
597           throw new RuntimeException(e);
598         }
599       }
600       
601       private Node getNextNode() {
602         try {
603           return node.getNextNode();
604         } catch (IOException e) {
605           throw new RuntimeException(e);
606         }
607       }
608       
609       public IndexIterator() {
610         node = null;
611       }
612       
613       public IndexIterator(Node node) {
614         this.node = node;
615         liter = node.indexBlock.getIndex().listIterator(node.currentPos);
616       }
617       
618       @Override
619       public boolean hasNext() {
620         if (node == null)
621           return false;
622         
623         if (!liter.hasNext()) {
624           return node.indexBlock.hasNext();
625         } else {
626           return true;
627         }
628         
629       }
630       
631       public IndexEntry peekPrevious() {
632         IndexEntry ret = previous();
633         next();
634         return ret;
635       }
636       
637       public IndexEntry peek() {
638         IndexEntry ret = next();
639         previous();
640         return ret;
641       }
642       
643       @Override
644       public IndexEntry next() {
645         if (!liter.hasNext()) {
646           node = getNextNode();
647           liter = node.indexBlock.getIndex().listIterator();
648         }
649         
650         return liter.next();
651       }
652       
653       @Override
654       public boolean hasPrevious() {
655         if (node == null)
656           return false;
657         
658         if (!liter.hasPrevious()) {
659           return node.indexBlock.getOffset() > 0;
660         } else {
661           return true;
662         }
663       }
664       
665       @Override
666       public IndexEntry previous() {
667         if (!liter.hasPrevious()) {
668           node = getPrevNode();
669           liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
670         }
671         
672         return liter.previous();
673       }
674       
675       @Override
676       public int nextIndex() {
677         return node.indexBlock.getOffset() + liter.nextIndex();
678       }
679       
680       @Override
681       public int previousIndex() {
682         return node.indexBlock.getOffset() + liter.previousIndex();
683       }
684       
685       @Override
686       public void remove() {
687         throw new UnsupportedOperationException();
688       }
689       
690       @Override
691       public void set(IndexEntry e) {
692         throw new UnsupportedOperationException();
693         
694       }
695       
696       @Override
697       public void add(IndexEntry e) {
698         throw new UnsupportedOperationException();
699       }
700       
701     }
702     
703     public Reader(BlockFileReader blockStore, int version) {
704       this.version = version;
705       this.blockStore = blockStore;
706     }
707     
708     private IndexBlock getIndexBlock(IndexEntry ie) throws IOException {
709       IndexBlock iblock = new IndexBlock();
710       ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
711       iblock.readFields(in, version);
712       in.close();
713       
714       return iblock;
715     }
716     
717     public IndexIterator lookup(Key key) throws IOException {
718       Node node = new Node(rootBlock);
719       return new IndexIterator(node.lookup(key));
720     }
721     
722     public void readFields(DataInput in) throws IOException {
723       
724       size = 0;
725       
726       if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
727         size = in.readInt();
728       }
729       
730       rootBlock = new IndexBlock();
731       rootBlock.readFields(in, version);
732       
733       if (version == RFile.RINDEX_VER_3 || version == RFile.RINDEX_VER_4) {
734         size = rootBlock.getIndex().size();
735       }
736     }
737     
738     public int size() {
739       return size;
740     }
741     
742     private void getIndexInfo(IndexBlock ib, Map<Integer,Long> sizesByLevel, Map<Integer,Long> countsByLevel) throws IOException {
743       Long size = sizesByLevel.get(ib.getLevel());
744       if (size == null)
745         size = 0l;
746       
747       Long count = countsByLevel.get(ib.getLevel());
748       if (count == null)
749         count = 0l;
750       
751       size += ib.index.sizeInBytes();
752       count++;
753       
754       sizesByLevel.put(ib.getLevel(), size);
755       countsByLevel.put(ib.getLevel(), count);
756       
757       if (ib.getLevel() > 0) {
758         for (IndexEntry ie : ib.index) {
759           IndexBlock cib = getIndexBlock(ie);
760           getIndexInfo(cib, sizesByLevel, countsByLevel);
761         }
762       }
763     }
764     
765     public void getIndexInfo(Map<Integer,Long> sizes, Map<Integer,Long> counts) throws IOException {
766       getIndexInfo(rootBlock, sizes, counts);
767     }
768     
769     public Key getLastKey() {
770       return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
771     }
772   }
773   
774 }