1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.Comparator;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.TreeMap;
29
30 import org.apache.accumulo.core.client.IteratorSetting;
31 import org.apache.accumulo.core.conf.AccumuloConfiguration;
32 import org.apache.accumulo.core.conf.Property;
33 import org.apache.accumulo.core.data.Key;
34 import org.apache.accumulo.core.data.KeyExtent;
35 import org.apache.accumulo.core.data.Range;
36 import org.apache.accumulo.core.data.thrift.IterInfo;
37 import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
38 import org.apache.accumulo.core.iterators.user.VersioningIterator;
39 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
40 import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
41 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
42 import org.apache.hadoop.io.Writable;
43 import org.apache.hadoop.io.WritableComparable;
44 import org.apache.log4j.Logger;
45 import org.apache.thrift.TDeserializer;
46 import org.apache.thrift.TException;
47 import org.apache.thrift.TSerializer;
48 import org.apache.thrift.protocol.TBinaryProtocol;
49 public class IteratorUtil {
50
51 private static final Logger log = Logger.getLogger(IteratorUtil.class);
52
53 public static enum IteratorScope {
54 majc, minc, scan;
55 }
56
57 public static class IterInfoComparator implements Comparator<IterInfo> {
58
59 @Override
60 public int compare(IterInfo o1, IterInfo o2) {
61 return (o1.priority < o2.priority ? -1 : (o1.priority == o2.priority ? 0 : 1));
62 }
63
64 }
65
66 /**
67 * Generate the initial (default) properties for a table
68 * @param limitVersion
69 * include a VersioningIterator at priority 20 that retains a single version of a given K/V pair.
70 * @return A map of Table properties
71 */
72 public static Map<String,String> generateInitialTableProperties(boolean limitVersion) {
73 TreeMap<String,String> props = new TreeMap<String,String>();
74
75 if (limitVersion) {
76 for (IteratorScope iterScope : IteratorScope.values()) {
77 props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers", "20," + VersioningIterator.class.getName());
78 props.put(Property.TABLE_ITERATOR_PREFIX + iterScope.name() + ".vers.opt.maxVersions", "1");
79 }
80 }
81
82 return props;
83 }
84
85 public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) {
86 List<IterInfo> iters = new ArrayList<IterInfo>();
87 parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf);
88
89 int max = 0;
90
91 for (IterInfo iterInfo : iters) {
92 if (iterInfo.priority > max)
93 max = iterInfo.priority;
94 }
95
96 return max;
97 }
98
99 private static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
100 for (Entry<String,String> entry : conf) {
101 if (entry.getKey().startsWith(Property.TABLE_ITERATOR_PREFIX.getKey())) {
102
103 String suffix = entry.getKey().substring(Property.TABLE_ITERATOR_PREFIX.getKey().length());
104 String suffixSplit[] = suffix.split("\\.", 4);
105
106 if (!suffixSplit[0].equals(scope.name())) {
107
108
109 boolean found = false;
110 IteratorScope[] scopes = IteratorScope.values();
111 for (IteratorScope s : scopes) {
112 found = found || suffixSplit[0].equals(s.name());
113 }
114
115 if (!found) {
116 log.warn("Option contains unknown scope: " + entry.getKey());
117 }
118
119 continue;
120 }
121
122 if (suffixSplit.length == 2) {
123 String sa[] = entry.getValue().split(",");
124 int prio = Integer.parseInt(sa[0]);
125 String className = sa[1];
126 iters.add(new IterInfo(prio, className, suffixSplit[1]));
127 } else if (suffixSplit.length == 4 && suffixSplit[2].equals("opt")) {
128 String iterName = suffixSplit[1];
129 String optName = suffixSplit[3];
130
131 Map<String,String> options = allOptions.get(iterName);
132 if (options == null) {
133 options = new HashMap<String,String>();
134 allOptions.put(iterName, options);
135 }
136
137 options.put(optName, entry.getValue());
138
139 } else {
140 log.warn("Unrecognizable option: " + entry.getKey());
141 }
142 }
143 }
144
145 Collections.sort(iters, new IterInfoComparator());
146 }
147
148 public static String findIterator(IteratorScope scope, String className, AccumuloConfiguration conf, Map<String,String> opts) {
149 ArrayList<IterInfo> iters = new ArrayList<IterInfo>();
150 Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
151
152 parseIterConf(scope, iters, allOptions, conf);
153
154 for (IterInfo iterInfo : iters)
155 if (iterInfo.className.equals(className)) {
156 Map<String,String> tmpOpts = allOptions.get(iterInfo.iterName);
157 if (tmpOpts != null) {
158 opts.putAll(tmpOpts);
159 }
160 return iterInfo.iterName;
161 }
162
163 return null;
164 }
165
166 public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
167 SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, IteratorEnvironment env) throws IOException {
168 List<IterInfo> emptyList = Collections.emptyList();
169 Map<String,Map<String,String>> emptyMap = Collections.emptyMap();
170 return loadIterators(scope, source, extent, conf, emptyList, emptyMap, env);
171 }
172
173 public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
174 SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IteratorSetting> iterators, IteratorEnvironment env)
175 throws IOException {
176
177 List<IterInfo> ssiList = new ArrayList<IterInfo>();
178 Map<String,Map<String,String>> ssio = new HashMap<String,Map<String,String>>();
179
180 for (IteratorSetting is : iterators) {
181 ssiList.add(new IterInfo(is.getPriority(), is.getIteratorClass(), is.getName()));
182 ssio.put(is.getName(), is.getOptions());
183 }
184
185 return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
186 }
187
188 public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
189 SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
190 IteratorEnvironment env) throws IOException {
191 return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
192 }
193
194 public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
195 SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
196 IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
197 List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
198 Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
199
200 parseIterConf(scope, iters, allOptions, conf);
201
202 for (Entry<String,Map<String,String>> entry : ssio.entrySet()) {
203 if (entry.getValue() == null)
204 continue;
205 Map<String,String> options = allOptions.get(entry.getKey());
206 if (options == null) {
207 allOptions.put(entry.getKey(), entry.getValue());
208 } else {
209 options.putAll(entry.getValue());
210 }
211 }
212
213 return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
214 }
215
216 @SuppressWarnings("unchecked")
217 public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
218 Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context)
219 throws IOException {
220
221 SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<K,V>(source);
222
223 try {
224 for (IterInfo iterInfo : iters) {
225
226 Class<? extends SortedKeyValueIterator<K,V>> clazz;
227 if (useAccumuloClassLoader){
228 if (context != null && !context.equals(""))
229 clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.getContextManager().loadClass(context, iterInfo.className,
230 SortedKeyValueIterator.class);
231 else
232 clazz = (Class<? extends SortedKeyValueIterator<K,V>>) AccumuloVFSClassLoader.loadClass(iterInfo.className, SortedKeyValueIterator.class);
233 }else{
234 clazz = (Class<? extends SortedKeyValueIterator<K,V>>) Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
235 }
236 SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
237
238 Map<String,String> options = iterOpts.get(iterInfo.iterName);
239
240 if (options == null)
241 options = Collections.emptyMap();
242
243 skvi.init(prev, options, env);
244 prev = skvi;
245 }
246 } catch (ClassNotFoundException e) {
247 log.error(e.toString());
248 throw new IOException(e);
249 } catch (InstantiationException e) {
250 log.error(e.toString());
251 throw new IOException(e);
252 } catch (IllegalAccessException e) {
253 log.error(e.toString());
254 throw new IOException(e);
255 }
256 return prev;
257 }
258
259 public static Range maximizeStartKeyTimeStamp(Range range) {
260 Range seekRange = range;
261
262 if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
263 Key seekKey = new Key(seekRange.getStartKey());
264 seekKey.setTimestamp(Long.MAX_VALUE);
265 seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
266 }
267
268 return seekRange;
269 }
270
271 public static Range minimizeEndKeyTimeStamp(Range range) {
272 Range seekRange = range;
273
274 if (range.getEndKey() != null && range.getEndKey().getTimestamp() != Long.MIN_VALUE) {
275 Key seekKey = new Key(seekRange.getEndKey());
276 seekKey.setTimestamp(Long.MIN_VALUE);
277 seekRange = new Range(range.getStartKey(), range.isStartKeyInclusive(), seekKey, true);
278 }
279
280 return seekRange;
281 }
282
283 public static TIteratorSetting toTIteratorSetting(IteratorSetting is) {
284 return new TIteratorSetting(is.getPriority(), is.getName(), is.getIteratorClass(), is.getOptions());
285 }
286
287 public static IteratorSetting toIteratorSetting(TIteratorSetting tis) {
288 return new IteratorSetting(tis.getPriority(), tis.getName(), tis.getIteratorClass(), tis.getProperties());
289 }
290
291 public static IteratorConfig toIteratorConfig(List<IteratorSetting> iterators) {
292 ArrayList<TIteratorSetting> tisList = new ArrayList<TIteratorSetting>();
293
294 for (IteratorSetting iteratorSetting : iterators) {
295 tisList.add(toTIteratorSetting(iteratorSetting));
296 }
297
298 return new IteratorConfig(tisList);
299 }
300
301 public static List<IteratorSetting> toIteratorSettings(IteratorConfig ic) {
302 List<IteratorSetting> ret = new ArrayList<IteratorSetting>();
303 for (TIteratorSetting tIteratorSetting : ic.getIterators()) {
304 ret.add(toIteratorSetting(tIteratorSetting));
305 }
306
307 return ret;
308 }
309
310 public static byte[] encodeIteratorSettings(IteratorConfig iterators) {
311 TSerializer tser = new TSerializer(new TBinaryProtocol.Factory());
312
313 try {
314 return tser.serialize(iterators);
315 } catch (TException e) {
316 throw new RuntimeException(e);
317 }
318 }
319
320 public static byte[] encodeIteratorSettings(List<IteratorSetting> iterators) {
321 return encodeIteratorSettings(toIteratorConfig(iterators));
322 }
323
324
325 public static List<IteratorSetting> decodeIteratorSettings(byte[] enc) {
326 TDeserializer tdser = new TDeserializer(new TBinaryProtocol.Factory());
327 IteratorConfig ic = new IteratorConfig();
328 try {
329 tdser.deserialize(ic, enc);
330 } catch (TException e) {
331 throw new RuntimeException(e);
332 }
333 return toIteratorSettings(ic);
334 }
335 }