1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.file.rfile;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.accumulo.core.data.Key;
25 import org.apache.accumulo.core.data.Value;
26 import org.apache.accumulo.core.file.blockfile.ABlockReader;
27 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
28
29 /**
30 *
31 */
32 public class BlockIndex {
33
34 public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
35
36 BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
37
38 int accessCount = blockIndex.accessCount.incrementAndGet();
39
40
41 if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
42 blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
43 }
44
45 if (blockIndex.blockIndex != null)
46 return blockIndex;
47
48 return null;
49 }
50
51 private static boolean isPowerOfTwo(int x) {
52 return ((x > 0) && (x & (x - 1)) == 0);
53 }
54
55 private AtomicInteger accessCount = new AtomicInteger(0);
56 private volatile BlockIndexEntry[] blockIndex = null;
57
58 public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
59
60 private Key prevKey;
61 private int entriesLeft;
62 private int pos;
63
64 public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) {
65 this.pos = pos;
66 this.entriesLeft = entriesLeft;
67 this.prevKey = prevKey;
68 }
69
70 /**
71 * @param key
72 */
73 public BlockIndexEntry(Key key) {
74 this.prevKey = key;
75 }
76
77
78
79 public int getEntriesLeft() {
80 return entriesLeft;
81 }
82
83 @Override
84 public int compareTo(BlockIndexEntry o) {
85 return prevKey.compareTo(o.prevKey);
86 }
87
88 @Override
89 public String toString() {
90 return prevKey + " " + entriesLeft + " " + pos;
91 }
92
93 public Key getPrevKey() {
94 return prevKey;
95 }
96 }
97
98 public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
99
100
101 BlockIndexEntry[] blockIndex = this.blockIndex;
102
103 int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
104
105 int index;
106
107 if (pos < 0) {
108 if (pos == -1)
109 return null;
110
111 index = (pos * -1) - 2;
112 } else {
113
114 index = pos;
115 while (index > 0) {
116 if (blockIndex[index].getPrevKey().equals(startKey))
117 index--;
118 else
119 break;
120 }
121 }
122
123
124 while (index - 1 > 0) {
125 if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey()))
126 index--;
127 else
128 break;
129
130 }
131
132 if (index == 0 && blockIndex[index].getPrevKey().equals(startKey))
133 return null;
134
135 BlockIndexEntry bie = blockIndex[index];
136 cacheBlock.seek(bie.pos);
137 return bie;
138 }
139
140 private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
141 cacheBlock.seek(0);
142
143 RelativeKey rk = new RelativeKey();
144 Value val = new Value();
145
146 int interval = indexEntry.getNumEntries() / indexEntries;
147
148 if (interval <= 32)
149 return;
150
151
152 if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
153 return;
154
155 int count = 0;
156
157 ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
158
159 while (count < (indexEntry.getNumEntries() - interval + 1)) {
160
161 Key myPrevKey = rk.getKey();
162 int pos = cacheBlock.getPosition();
163 rk.readFields(cacheBlock);
164 val.readFields(cacheBlock);
165
166 if (count > 0 && count % interval == 0) {
167 index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey));
168 }
169
170 count++;
171 }
172
173 this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
174
175 cacheBlock.seek(0);
176 }
177
178 BlockIndexEntry[] getIndexEntries() {
179 return blockIndex;
180 }
181 }