View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.Comparator;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.TreeMap;
29  
30  import org.apache.accumulo.core.client.IteratorSetting;
31  import org.apache.accumulo.core.conf.AccumuloConfiguration;
32  import org.apache.accumulo.core.conf.Property;
33  import org.apache.accumulo.core.data.Key;
34  import org.apache.accumulo.core.data.KeyExtent;
35  import org.apache.accumulo.core.data.Range;
36  import org.apache.accumulo.core.data.thrift.IterInfo;
37  import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
38  import org.apache.accumulo.core.iterators.user.VersioningIterator;
39  import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
40  import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
41  import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
42  import org.apache.hadoop.io.Writable;
43  import org.apache.hadoop.io.WritableComparable;
44  import org.apache.log4j.Logger;
45  import org.apache.thrift.TDeserializer;
46  import org.apache.thrift.TException;
47  import org.apache.thrift.TSerializer;
48  import org.apache.thrift.protocol.TBinaryProtocol;
49  public class IteratorUtil {
50    
51    private static final Logger log = Logger.getLogger(IteratorUtil.class);
52    
53    public static enum IteratorScope {
54      majc, minc, scan;
55    }
56    
57    public static class IterInfoComparator implements Comparator<IterInfo> {
58      
59      @Override
60      public int compare(IterInfo o1, IterInfo o2) {
61        return (o1.priority < o2.priority ? -1 : (o1.priority == o2.priority ? 0 : 1));
62      }
63      
64    }
65    
66    /**
67     * Generate the initial (default) properties for a table
68     * @param limitVersion
69     *   include a VersioningIterator at priority 20 that retains a single version of a given K/V pair.
70     * @return A map of Table properties
71     */
72    public static Map<String,String> generateInitialTableProperties(boolean limitVersion) {
73      TreeMap<String,String> props = new TreeMap<String,String>();
74      
75      if (limitVersion) {
76          for (IteratorScope iterScope : IteratorScope.values()) {
77            props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers", "20," + VersioningIterator.class.getName());
78            props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers.opt.maxVersions", "1");
79          }
80      }
81      
82      return props;
83    }
84    
85    public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) {
86      List<IterInfo> iters = new ArrayList<IterInfo>();
87      parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf);
88      
89      int max = 0;
90      
91      for (IterInfo iterInfo : iters) {
92        if (iterInfo.priority > max)
93          max = iterInfo.priority;
94      }
95      
96      return max;
97    }
98    
99    private static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
100     for (Entry<String,String> entry : conf) {
101       if (entry.getKey().startsWith(Property.TABLE_ITERATOR_PREFIX.getKey())) {
102         
103         String suffix = entry.getKey().substring(Property.TABLE_ITERATOR_PREFIX.getKey().length());
104         String suffixSplit[] = suffix.split("\\.", 4);
105         
106         if (!suffixSplit[0].equals(scope.name())) {
107           
108           // do a sanity check to see if this is a valid scope
109           boolean found = false;
110           IteratorScope[] scopes = IteratorScope.values();
111           for (IteratorScope s : scopes) {
112             found = found || suffixSplit[0].equals(s.name());
113           }
114           
115           if (!found) {
116             log.warn("Option contains unknown scope: " + entry.getKey());
117           }
118           
119           continue;
120         }
121         
122         if (suffixSplit.length == 2) {
123           String sa[] = entry.getValue().split(",");
124           int prio = Integer.parseInt(sa[0]);
125           String className = sa[1];
126           iters.add(new IterInfo(prio, className, suffixSplit[1]));
127         } else if (suffixSplit.length == 4 && suffixSplit[2].equals("opt")) {
128           String iterName = suffixSplit[1];
129           String optName = suffixSplit[3];
130           
131           Map<String,String> options = allOptions.get(iterName);
132           if (options == null) {
133             options = new HashMap<String,String>();
134             allOptions.put(iterName, options);
135           }
136           
137           options.put(optName, entry.getValue());
138           
139         } else {
140           log.warn("Unrecognizable option: " + entry.getKey());
141         }
142       }
143     }
144     
145     Collections.sort(iters, new IterInfoComparator());
146   }
147   
148   public static String findIterator(IteratorScope scope, String className, AccumuloConfiguration conf, Map<String,String> opts) {
149     ArrayList<IterInfo> iters = new ArrayList<IterInfo>();
150     Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
151     
152     parseIterConf(scope, iters, allOptions, conf);
153     
154     for (IterInfo iterInfo : iters)
155       if (iterInfo.className.equals(className)) {
156         Map<String,String> tmpOpts = allOptions.get(iterInfo.iterName);
157         if (tmpOpts != null) {
158           opts.putAll(tmpOpts);
159         }
160         return iterInfo.iterName;
161       }
162     
163     return null;
164   }
165   
166   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
167       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, IteratorEnvironment env) throws IOException {
168     List<IterInfo> emptyList = Collections.emptyList();
169     Map<String,Map<String,String>> emptyMap = Collections.emptyMap();
170     return loadIterators(scope, source, extent, conf, emptyList, emptyMap, env);
171   }
172   
173   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
174       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IteratorSetting> iterators, IteratorEnvironment env)
175       throws IOException {
176     
177     List<IterInfo> ssiList = new ArrayList<IterInfo>();
178     Map<String,Map<String,String>> ssio = new HashMap<String,Map<String,String>>();
179     
180     for (IteratorSetting is : iterators) {
181       ssiList.add(new IterInfo(is.getPriority(), is.getIteratorClass(), is.getName()));
182       ssio.put(is.getName(), is.getOptions());
183     }
184     
185     return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
186   }
187   
188   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
189       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
190       IteratorEnvironment env) throws IOException {
191     return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
192   }
193   
194   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
195       SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
196       IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
197     List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
198     Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
199     
200     parseIterConf(scope, iters, allOptions, conf);
201     
202     for (Entry<String,Map<String,String>> entry : ssio.entrySet()) {
203       if (entry.getValue() == null)
204         continue;
205       Map<String,String> options = allOptions.get(entry.getKey());
206       if (options == null) {
207         allOptions.put(entry.getKey(), entry.getValue());
208       } else {
209         options.putAll(entry.getValue());
210       }
211     }
212     
213     return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
214   }
215   
216   @SuppressWarnings("unchecked")
217   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
218       Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context)
219       throws IOException {
220     // wrap the source in a SynchronizedIterator in case any of the additional configured iterators want to use threading
221     SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<K,V>(source);
222     
223     try {
224       for (IterInfo iterInfo : iters) {
225        
226         Class<? extends SortedKeyValueIterator<K,V>> clazz;
227         if (useAccumuloClassLoader){
228           if (context != null && !context.equals(""))
229             clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.getContextManager().loadClass(context, iterInfo.className,
230                 SortedKeyValueIterator.class);
231           else
232             clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class);
233         }else{
234           clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
235         }
236         SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
237         
238         Map<String,String> options = iterOpts.get(iterInfo.iterName);
239         
240         if (options == null)
241           options = Collections.emptyMap();
242         
243         skvi.init(prev, options, env);
244         prev = skvi;
245       }
246     } catch (ClassNotFoundException e) {
247       log.error(e.toString());
248       throw new IOException(e);
249     } catch (InstantiationException e) {
250       log.error(e.toString());
251       throw new IOException(e);
252     } catch (IllegalAccessException e) {
253       log.error(e.toString());
254       throw new IOException(e);
255     }
256     return prev;
257   }
258   
259   public static Range maximizeStartKeyTimeStamp(Range range) {
260     Range seekRange = range;
261     
262     if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
263       Key seekKey = new Key(seekRange.getStartKey());
264       seekKey.setTimestamp(Long.MAX_VALUE);
265       seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
266     }
267     
268     return seekRange;
269   }
270   
271   public static Range minimizeEndKeyTimeStamp(Range range) {
272     Range seekRange = range;
273     
274     if (range.getEndKey() != null && range.getEndKey().getTimestamp() != Long.MIN_VALUE) {
275       Key seekKey = new Key(seekRange.getEndKey());
276       seekKey.setTimestamp(Long.MIN_VALUE);
277       seekRange = new Range(range.getStartKey(), range.isStartKeyInclusive(), seekKey, true);
278     }
279     
280     return seekRange;
281   }
282   
283   public static TIteratorSetting toTIteratorSetting(IteratorSetting is) {
284     return new TIteratorSetting(is.getPriority(), is.getName(), is.getIteratorClass(), is.getOptions());
285   }
286   
287   public static IteratorSetting toIteratorSetting(TIteratorSetting tis) {
288     return new IteratorSetting(tis.getPriority(), tis.getName(), tis.getIteratorClass(), tis.getProperties());
289   }
290 
291   public static IteratorConfig toIteratorConfig(List<IteratorSetting> iterators) {
292     ArrayList<TIteratorSetting> tisList = new ArrayList<TIteratorSetting>();
293     
294     for (IteratorSetting iteratorSetting : iterators) {
295       tisList.add(toTIteratorSetting(iteratorSetting));
296     }
297     
298     return new IteratorConfig(tisList);
299   }
300   
301   public static List<IteratorSetting> toIteratorSettings(IteratorConfig ic) {
302     List<IteratorSetting> ret = new ArrayList<IteratorSetting>();
303     for (TIteratorSetting tIteratorSetting : ic.getIterators()) {
304       ret.add(toIteratorSetting(tIteratorSetting));
305     }
306     
307     return ret;
308   }
309 
310   public static byte[] encodeIteratorSettings(IteratorConfig iterators) {
311     TSerializer tser = new TSerializer(new TBinaryProtocol.Factory());
312     
313     try {
314       return tser.serialize(iterators);
315     } catch (TException e) {
316       throw new RuntimeException(e);
317     }
318   }
319   
320   public static byte[] encodeIteratorSettings(List<IteratorSetting> iterators) {
321     return encodeIteratorSettings(toIteratorConfig(iterators));
322   }
323 
324 
325   public static List<IteratorSetting> decodeIteratorSettings(byte[] enc) {
326     TDeserializer tdser = new TDeserializer(new TBinaryProtocol.Factory());
327     IteratorConfig ic = new IteratorConfig();
328     try {
329       tdser.deserialize(ic, enc);
330     } catch (TException e) {
331       throw new RuntimeException(e);
332     }
333     return toIteratorSettings(ic);
334   }
335 }