1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.file.rfile;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInput;
22 import java.io.DataInputStream;
23 import java.io.DataOutput;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.util.AbstractList;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.List;
31 import java.util.ListIterator;
32 import java.util.Map;
33 import java.util.RandomAccess;
34
35 import org.apache.accumulo.core.data.Key;
36 import org.apache.accumulo.core.file.blockfile.ABlockReader;
37 import org.apache.accumulo.core.file.blockfile.ABlockWriter;
38 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
39 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
40 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
41 import org.apache.hadoop.io.WritableComparable;
42
43 public class MultiLevelIndex {
44
45 public static class IndexEntry implements WritableComparable<IndexEntry> {
46 private Key key;
47 private int entries;
48 private long offset;
49 private long compressedSize;
50 private long rawSize;
51 private boolean newFormat;
52
53 IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
54 this.key = k;
55 this.entries = e;
56 this.offset = offset;
57 this.compressedSize = compressedSize;
58 this.rawSize = rawSize;
59 newFormat = true;
60 }
61
62 public IndexEntry(boolean newFormat) {
63 this.newFormat = newFormat;
64 }
65
66 @Override
67 public void readFields(DataInput in) throws IOException {
68 key = new Key();
69 key.readFields(in);
70 entries = in.readInt();
71 if (newFormat) {
72 offset = Utils.readVLong(in);
73 compressedSize = Utils.readVLong(in);
74 rawSize = Utils.readVLong(in);
75 } else {
76 offset = -1;
77 compressedSize = -1;
78 rawSize = -1;
79 }
80 }
81
82 @Override
83 public void write(DataOutput out) throws IOException {
84 key.write(out);
85 out.writeInt(entries);
86 if (newFormat) {
87 Utils.writeVLong(out, offset);
88 Utils.writeVLong(out, compressedSize);
89 Utils.writeVLong(out, rawSize);
90 }
91 }
92
93 public Key getKey() {
94 return key;
95 }
96
97 public int getNumEntries() {
98 return entries;
99 }
100
101 public long getOffset() {
102 return offset;
103 }
104
105 public long getCompressedSize() {
106 return compressedSize;
107 }
108
109 public long getRawSize() {
110 return rawSize;
111 }
112
113 @Override
114 public int compareTo(IndexEntry o) {
115 return key.compareTo(o.key);
116 }
117 }
118
119
120 private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
121
122 private int[] offsets;
123 private byte[] data;
124 private boolean newFormat;
125
126 SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
127 this.offsets = offsets;
128 this.data = data;
129 this.newFormat = newFormat;
130 }
131
132 @Override
133 public IndexEntry get(int index) {
134 int len;
135 if (index == offsets.length - 1)
136 len = data.length - offsets[index];
137 else
138 len = offsets[index + 1] - offsets[index];
139
140 ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
141 DataInputStream dis = new DataInputStream(bais);
142
143 IndexEntry ie = new IndexEntry(newFormat);
144 try {
145 ie.readFields(dis);
146 } catch (IOException e) {
147 throw new RuntimeException(e);
148 }
149
150 return ie;
151 }
152
153 @Override
154 public int size() {
155 return offsets.length;
156 }
157
158 public long sizeInBytes() {
159 return data.length + 4 * offsets.length;
160 }
161
162 }
163
164 private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess {
165
166 private int[] offsets;
167 private byte[] data;
168
169 KeyIndex(int[] offsets, byte[] data) {
170 this.offsets = offsets;
171 this.data = data;
172 }
173
174 @Override
175 public Key get(int index) {
176 int len;
177 if (index == offsets.length - 1)
178 len = data.length - offsets[index];
179 else
180 len = offsets[index + 1] - offsets[index];
181
182 ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
183 DataInputStream dis = new DataInputStream(bais);
184
185 Key key = new Key();
186 try {
187 key.readFields(dis);
188 } catch (IOException e) {
189 throw new RuntimeException(e);
190 }
191
192 return key;
193 }
194
195 @Override
196 public int size() {
197 return offsets.length;
198 }
199 }
200
201 static class IndexBlock {
202
203 private ByteArrayOutputStream indexBytes;
204 private DataOutputStream indexOut;
205
206 private ArrayList<Integer> offsets;
207 private int level;
208 private int offset;
209
210 SerializedIndex index;
211 KeyIndex keyIndex;
212 private boolean hasNext;
213
214 public IndexBlock(int level, int totalAdded) {
215
216
217 this.level = level;
218 this.offset = totalAdded;
219
220 indexBytes = new ByteArrayOutputStream();
221 indexOut = new DataOutputStream(indexBytes);
222 offsets = new ArrayList<Integer>();
223 }
224
225 public IndexBlock() {}
226
227 public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
228 offsets.add(indexOut.size());
229 new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
230 }
231
232 int getSize() {
233 return indexOut.size() + 4 * offsets.size();
234 }
235
236 public void write(DataOutput out) throws IOException {
237 out.writeInt(level);
238 out.writeInt(offset);
239 out.writeBoolean(hasNext);
240
241 out.writeInt(offsets.size());
242 for (Integer offset : offsets) {
243 out.writeInt(offset);
244 }
245
246 indexOut.close();
247 byte[] indexData = indexBytes.toByteArray();
248
249 out.writeInt(indexData.length);
250 out.write(indexData);
251 }
252
253 public void readFields(DataInput in, int version) throws IOException {
254
255 if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
256 level = in.readInt();
257 offset = in.readInt();
258 hasNext = in.readBoolean();
259
260 int numOffsets = in.readInt();
261 int[] offsets = new int[numOffsets];
262
263 for (int i = 0; i < numOffsets; i++)
264 offsets[i] = in.readInt();
265
266 int indexSize = in.readInt();
267 byte[] serializedIndex = new byte[indexSize];
268 in.readFully(serializedIndex);
269
270 index = new SerializedIndex(offsets, serializedIndex, true);
271 keyIndex = new KeyIndex(offsets, serializedIndex);
272 } else if (version == RFile.RINDEX_VER_3) {
273 level = 0;
274 offset = 0;
275 hasNext = false;
276
277 int size = in.readInt();
278
279 ByteArrayOutputStream baos = new ByteArrayOutputStream();
280 DataOutputStream dos = new DataOutputStream(baos);
281 ArrayList<Integer> oal = new ArrayList<Integer>();
282
283 for (int i = 0; i < size; i++) {
284 IndexEntry ie = new IndexEntry(false);
285 oal.add(dos.size());
286 ie.readFields(in);
287 ie.write(dos);
288 }
289
290 dos.close();
291
292 int[] oia = new int[oal.size()];
293 for (int i = 0; i < oal.size(); i++) {
294 oia[i] = oal.get(i);
295 }
296
297 byte[] serializedIndex = baos.toByteArray();
298 index = new SerializedIndex(oia, serializedIndex, false);
299 keyIndex = new KeyIndex(oia, serializedIndex);
300 } else if (version == RFile.RINDEX_VER_4) {
301 level = 0;
302 offset = 0;
303 hasNext = false;
304
305 int numIndexEntries = in.readInt();
306 int offsets[] = new int[numIndexEntries];
307 for (int i = 0; i < numIndexEntries; i++) {
308 offsets[i] = in.readInt();
309 }
310
311 int size = in.readInt();
312 byte[] indexData = new byte[size];
313 in.readFully(indexData);
314
315 index = new SerializedIndex(offsets, indexData, false);
316 keyIndex = new KeyIndex(offsets, indexData);
317 } else {
318 throw new RuntimeException("Unexpected version " + version);
319 }
320
321 }
322
323 List<IndexEntry> getIndex() {
324 return index;
325 }
326
327 public List<Key> getKeyIndex() {
328 return keyIndex;
329 }
330
331 int getLevel() {
332 return level;
333 }
334
335 int getOffset() {
336 return offset;
337 }
338
339 boolean hasNext() {
340 return hasNext;
341 }
342
343 void setHasNext(boolean b) {
344 this.hasNext = b;
345 }
346
347 }
348
349 /**
350 * this class buffers writes to the index so that chunks of index blocks are contiguous in the file instead of having index blocks sprinkled throughout the
351 * file making scans of the entire index slow.
352 */
353 public static class BufferedWriter {
354
355 private Writer writer;
356 private DataOutputStream buffer;
357 private int buffered;
358 private ByteArrayOutputStream baos;
359
360 public BufferedWriter(Writer writer) {
361 this.writer = writer;
362 baos = new ByteArrayOutputStream(1 << 20);
363 buffer = new DataOutputStream(baos);
364 buffered = 0;
365 }
366
367 private void flush() throws IOException {
368 buffer.close();
369
370 DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
371
372 IndexEntry ie = new IndexEntry(true);
373 for (int i = 0; i < buffered; i++) {
374 ie.readFields(dis);
375 writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
376 }
377
378 buffered = 0;
379 baos = new ByteArrayOutputStream(1 << 20);
380 buffer = new DataOutputStream(baos);
381
382 }
383
384 public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
385 if (buffer.size() > (10 * 1 << 20)) {
386 flush();
387 }
388
389 new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
390 buffered++;
391 }
392
393 public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
394 flush();
395 writer.addLast(key, data, offset, compressedSize, rawSize);
396 }
397
398 public void close(DataOutput out) throws IOException {
399 writer.close(out);
400 }
401 }
402
403 public static class Writer {
404 private int threshold;
405
406 private ArrayList<IndexBlock> levels;
407
408 private int totalAdded;
409
410 private boolean addedLast = false;
411
412 private BlockFileWriter blockFileWriter;
413
414 Writer(BlockFileWriter blockFileWriter, int maxBlockSize) {
415 this.blockFileWriter = blockFileWriter;
416 this.threshold = maxBlockSize;
417 levels = new ArrayList<IndexBlock>();
418 }
419
420 private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
421 if (level == levels.size()) {
422 levels.add(new IndexBlock(level, 0));
423 }
424
425 IndexBlock iblock = levels.get(level);
426
427 iblock.add(key, data, offset, compressedSize, rawSize);
428 }
429
430 private void flush(int level, Key lastKey, boolean last) throws IOException {
431
432 if (last && level == levels.size() - 1)
433 return;
434
435 IndexBlock iblock = levels.get(level);
436 if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
437 ABlockWriter out = blockFileWriter.prepareDataBlock();
438 iblock.setHasNext(!last);
439 iblock.write(out);
440 out.close();
441
442 add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
443 flush(level + 1, lastKey, last);
444
445 if (last)
446 levels.set(level, null);
447 else
448 levels.set(level, new IndexBlock(level, totalAdded));
449 }
450 }
451
452 public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
453 totalAdded++;
454 add(0, key, data, offset, compressedSize, rawSize);
455 flush(0, key, false);
456 }
457
458 public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
459 if (addedLast)
460 throw new IllegalStateException("already added last");
461
462 totalAdded++;
463 add(0, key, data, offset, compressedSize, rawSize);
464 flush(0, key, true);
465 addedLast = true;
466
467 }
468
469 public void close(DataOutput out) throws IOException {
470 if (totalAdded > 0 && !addedLast)
471 throw new IllegalStateException("did not call addLast");
472
473 out.writeInt(totalAdded);
474
475 if (levels.size() > 0) {
476 levels.get(levels.size() - 1).write(out);
477 } else {
478 new IndexBlock(0, 0).write(out);
479 }
480
481 }
482 }
483
484 public static class Reader {
485 private IndexBlock rootBlock;
486 private BlockFileReader blockStore;
487 private int version;
488 private int size;
489
490 public class Node {
491
492 private Node parent;
493 private IndexBlock indexBlock;
494 private int currentPos;
495
496 Node(Node parent, IndexBlock iBlock) {
497 this.parent = parent;
498 this.indexBlock = iBlock;
499 }
500
501 Node(IndexBlock rootInfo) {
502 this.parent = null;
503 this.indexBlock = rootInfo;
504 }
505
506 private Node lookup(Key key) throws IOException {
507 int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
508 @Override
509 public int compare(Key o1, Key o2) {
510 return o1.compareTo(o2);
511 }
512 });
513
514 if (pos < 0)
515 pos = (pos * -1) - 1;
516
517 if (pos == indexBlock.getIndex().size()) {
518 if (parent != null)
519 throw new IllegalStateException();
520 this.currentPos = pos;
521 return this;
522 }
523
524 this.currentPos = pos;
525
526 if (indexBlock.getLevel() == 0) {
527 return this;
528 }
529
530 IndexEntry ie = indexBlock.getIndex().get(pos);
531 Node child = new Node(this, getIndexBlock(ie));
532 return child.lookup(key);
533 }
534
535 private Node getLast() throws IOException {
536 currentPos = indexBlock.getIndex().size() - 1;
537 if (indexBlock.getLevel() == 0)
538 return this;
539
540 IndexEntry ie = indexBlock.getIndex().get(currentPos);
541 Node child = new Node(this, getIndexBlock(ie));
542 return child.getLast();
543 }
544
545 private Node getFirst() throws IOException {
546 currentPos = 0;
547 if (indexBlock.getLevel() == 0)
548 return this;
549
550 IndexEntry ie = indexBlock.getIndex().get(currentPos);
551 Node child = new Node(this, getIndexBlock(ie));
552 return child.getFirst();
553 }
554
555 private Node getPrevious() throws IOException {
556 if (currentPos == 0)
557 return parent.getPrevious();
558
559 currentPos--;
560
561 IndexEntry ie = indexBlock.getIndex().get(currentPos);
562 Node child = new Node(this, getIndexBlock(ie));
563 return child.getLast();
564
565 }
566
567 private Node getNext() throws IOException {
568 if (currentPos == indexBlock.getIndex().size() - 1)
569 return parent.getNext();
570
571 currentPos++;
572
573 IndexEntry ie = indexBlock.getIndex().get(currentPos);
574 Node child = new Node(this, getIndexBlock(ie));
575 return child.getFirst();
576
577 }
578
579 Node getNextNode() throws IOException {
580 return parent.getNext();
581 }
582
583 Node getPreviousNode() throws IOException {
584 return parent.getPrevious();
585 }
586 }
587
588 public class IndexIterator implements ListIterator<IndexEntry> {
589
590 private Node node;
591 private ListIterator<IndexEntry> liter;
592
593 private Node getPrevNode() {
594 try {
595 return node.getPreviousNode();
596 } catch (IOException e) {
597 throw new RuntimeException(e);
598 }
599 }
600
601 private Node getNextNode() {
602 try {
603 return node.getNextNode();
604 } catch (IOException e) {
605 throw new RuntimeException(e);
606 }
607 }
608
609 public IndexIterator() {
610 node = null;
611 }
612
613 public IndexIterator(Node node) {
614 this.node = node;
615 liter = node.indexBlock.getIndex().listIterator(node.currentPos);
616 }
617
618 @Override
619 public boolean hasNext() {
620 if (node == null)
621 return false;
622
623 if (!liter.hasNext()) {
624 return node.indexBlock.hasNext();
625 } else {
626 return true;
627 }
628
629 }
630
631 public IndexEntry peekPrevious() {
632 IndexEntry ret = previous();
633 next();
634 return ret;
635 }
636
637 public IndexEntry peek() {
638 IndexEntry ret = next();
639 previous();
640 return ret;
641 }
642
643 @Override
644 public IndexEntry next() {
645 if (!liter.hasNext()) {
646 node = getNextNode();
647 liter = node.indexBlock.getIndex().listIterator();
648 }
649
650 return liter.next();
651 }
652
653 @Override
654 public boolean hasPrevious() {
655 if (node == null)
656 return false;
657
658 if (!liter.hasPrevious()) {
659 return node.indexBlock.getOffset() > 0;
660 } else {
661 return true;
662 }
663 }
664
665 @Override
666 public IndexEntry previous() {
667 if (!liter.hasPrevious()) {
668 node = getPrevNode();
669 liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
670 }
671
672 return liter.previous();
673 }
674
675 @Override
676 public int nextIndex() {
677 return node.indexBlock.getOffset() + liter.nextIndex();
678 }
679
680 @Override
681 public int previousIndex() {
682 return node.indexBlock.getOffset() + liter.previousIndex();
683 }
684
685 @Override
686 public void remove() {
687 throw new UnsupportedOperationException();
688 }
689
690 @Override
691 public void set(IndexEntry e) {
692 throw new UnsupportedOperationException();
693
694 }
695
696 @Override
697 public void add(IndexEntry e) {
698 throw new UnsupportedOperationException();
699 }
700
701 }
702
703 public Reader(BlockFileReader blockStore, int version) {
704 this.version = version;
705 this.blockStore = blockStore;
706 }
707
708 private IndexBlock getIndexBlock(IndexEntry ie) throws IOException {
709 IndexBlock iblock = new IndexBlock();
710 ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
711 iblock.readFields(in, version);
712 in.close();
713
714 return iblock;
715 }
716
717 public IndexIterator lookup(Key key) throws IOException {
718 Node node = new Node(rootBlock);
719 return new IndexIterator(node.lookup(key));
720 }
721
722 public void readFields(DataInput in) throws IOException {
723
724 size = 0;
725
726 if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
727 size = in.readInt();
728 }
729
730 rootBlock = new IndexBlock();
731 rootBlock.readFields(in, version);
732
733 if (version == RFile.RINDEX_VER_3 || version == RFile.RINDEX_VER_4) {
734 size = rootBlock.getIndex().size();
735 }
736 }
737
738 public int size() {
739 return size;
740 }
741
742 private void getIndexInfo(IndexBlock ib, Map<Integer,Long> sizesByLevel, Map<Integer,Long> countsByLevel) throws IOException {
743 Long size = sizesByLevel.get(ib.getLevel());
744 if (size == null)
745 size = 0l;
746
747 Long count = countsByLevel.get(ib.getLevel());
748 if (count == null)
749 count = 0l;
750
751 size += ib.index.sizeInBytes();
752 count++;
753
754 sizesByLevel.put(ib.getLevel(), size);
755 countsByLevel.put(ib.getLevel(), count);
756
757 if (ib.getLevel() > 0) {
758 for (IndexEntry ie : ib.index) {
759 IndexBlock cib = getIndexBlock(ie);
760 getIndexInfo(cib, sizesByLevel, countsByLevel);
761 }
762 }
763 }
764
765 public void getIndexInfo(Map<Integer,Long> sizes, Map<Integer,Long> counts) throws IOException {
766 getIndexInfo(rootBlock, sizes, counts);
767 }
768
769 public Key getLastKey() {
770 return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
771 }
772 }
773
774 }