View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.file.rfile;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.concurrent.atomic.AtomicInteger;
23  
24  import org.apache.accumulo.core.data.Key;
25  import org.apache.accumulo.core.data.Value;
26  import org.apache.accumulo.core.file.blockfile.ABlockReader;
27  import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
28  
29  /**
30   * 
31   */
32  public class BlockIndex {
33    
34    public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
35      
36      BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
37      
38      int accessCount = blockIndex.accessCount.incrementAndGet();
39      
40      // 1 is a power of two, but do not care about it
41      if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
42        blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
43      }
44      
45      if (blockIndex.blockIndex != null)
46        return blockIndex;
47  
48      return null;
49    }
50    
51    private static boolean isPowerOfTwo(int x) {
52      return ((x > 0) && (x & (x - 1)) == 0);
53    }
54    
55    private AtomicInteger accessCount = new AtomicInteger(0);
56    private volatile BlockIndexEntry[] blockIndex = null;
57  
58    public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
59      
60      private Key prevKey;
61      private int entriesLeft;
62      private int pos;
63      
64      public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) {
65        this.pos = pos;
66        this.entriesLeft = entriesLeft;
67        this.prevKey = prevKey;
68      }
69  
70      /**
71       * @param key
72       */
73      public BlockIndexEntry(Key key) {
74        this.prevKey = key;
75      }
76  
77  
78      
79      public int getEntriesLeft() {
80        return entriesLeft;
81      }
82  
83      @Override
84      public int compareTo(BlockIndexEntry o) {
85        return prevKey.compareTo(o.prevKey);
86      }
87      
88      @Override
89      public String toString() {
90        return prevKey + " " + entriesLeft + " " + pos;
91      }
92      
93      public Key getPrevKey() {
94        return prevKey;
95      }
96    }
97    
98    public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
99  
100     // get a local ref to the index, another thread could change it
101     BlockIndexEntry[] blockIndex = this.blockIndex;
102     
103     int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
104 
105     int index;
106     
107     if (pos < 0) {
108       if (pos == -1)
109         return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
110                      // of block
111       index = (pos * -1) - 2;
112     } else {
113       // found exact key in index
114       index = pos;
115       while (index > 0) {
116         if (blockIndex[index].getPrevKey().equals(startKey))
117           index--;
118         else
119           break;
120       }
121     }
122     
123     // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
124     while (index - 1 > 0) {
125       if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey()))
126         index--;
127       else
128         break;
129 
130     }
131     
132     if (index == 0 && blockIndex[index].getPrevKey().equals(startKey))
133       return null;
134 
135     BlockIndexEntry bie = blockIndex[index];
136     cacheBlock.seek(bie.pos);
137     return bie;
138   }
139   
140   private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
141     cacheBlock.seek(0);
142     
143     RelativeKey rk = new RelativeKey();
144     Value val = new Value();
145     
146     int interval = indexEntry.getNumEntries() / indexEntries;
147     
148     if (interval <= 32)
149       return;
150     
151     // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
152     if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
153       return;
154 
155     int count = 0;
156     
157     ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
158 
159     while (count < (indexEntry.getNumEntries() - interval + 1)) {
160 
161       Key myPrevKey = rk.getKey();
162       int pos = cacheBlock.getPosition();
163       rk.readFields(cacheBlock);
164       val.readFields(cacheBlock);
165 
166       if (count > 0 && count % interval == 0) {
167         index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey));
168       }
169       
170       count++;
171     }
172 
173     this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
174 
175     cacheBlock.seek(0);
176   }
177   
178   BlockIndexEntry[] getIndexEntries() {
179     return blockIndex;
180   }
181 }