1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.mapreduce.lib.util;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.nio.charset.Charset;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31 import java.util.StringTokenizer;
32
33 import org.apache.accumulo.core.Constants;
34 import org.apache.accumulo.core.client.AccumuloException;
35 import org.apache.accumulo.core.client.AccumuloSecurityException;
36 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
37 import org.apache.accumulo.core.client.Connector;
38 import org.apache.accumulo.core.client.Instance;
39 import org.apache.accumulo.core.client.IsolatedScanner;
40 import org.apache.accumulo.core.client.IteratorSetting;
41 import org.apache.accumulo.core.client.Scanner;
42 import org.apache.accumulo.core.client.TableNotFoundException;
43 import org.apache.accumulo.core.client.impl.Tables;
44 import org.apache.accumulo.core.client.impl.TabletLocator;
45 import org.apache.accumulo.core.client.mock.MockTabletLocator;
46 import org.apache.accumulo.core.data.Range;
47 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
48 import org.apache.accumulo.core.security.Authorizations;
49 import org.apache.accumulo.core.security.CredentialHelper;
50 import org.apache.accumulo.core.security.TablePermission;
51 import org.apache.accumulo.core.security.thrift.Credential;
52 import org.apache.accumulo.core.util.ArgumentChecker;
53 import org.apache.accumulo.core.util.Pair;
54 import org.apache.accumulo.core.util.TextUtil;
55 import org.apache.commons.codec.binary.Base64;
56 import org.apache.hadoop.conf.Configuration;
57 import org.apache.hadoop.io.Text;
58 import org.apache.hadoop.util.StringUtils;
59
60 /**
61 * @since 1.5.0
62 */
63 public class InputConfigurator extends ConfiguratorBase {
64
65 /**
66 * Configuration keys for {@link Scanner}.
67 *
68 * @since 1.5.0
69 */
70 public static enum ScanOpts {
71 TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
72 }
73
74 /**
75 * Configuration keys for various features.
76 *
77 * @since 1.5.0
78 */
79 public static enum Features {
80 AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
81 }
82
83 /**
84 * Sets the name of the input table, over which this job will scan.
85 *
86 * @param implementingClass
87 * the class whose name will be used as a prefix for the property configuration key
88 * @param conf
89 * the Hadoop configuration object to configure
90 * @param tableName
91 * the table to use when the tablename is null in the write call
92 * @since 1.5.0
93 */
94 public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
95 ArgumentChecker.notNull(tableName);
96 conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
97 }
98
99 /**
100 * Gets the table name from the configuration.
101 *
102 * @param implementingClass
103 * the class whose name will be used as a prefix for the property configuration key
104 * @param conf
105 * the Hadoop configuration object to configure
106 * @return the table name
107 * @since 1.5.0
108 * @see #setInputTableName(Class, Configuration, String)
109 */
110 public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
111 return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
112 }
113
114 /**
115 * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
116 *
117 * @param implementingClass
118 * the class whose name will be used as a prefix for the property configuration key
119 * @param conf
120 * the Hadoop configuration object to configure
121 * @param auths
122 * the user's authorizations
123 * @since 1.5.0
124 */
125 public static void setScanAuthorizations(Class<?> implementingClass, Configuration conf, Authorizations auths) {
126 if (auths != null && !auths.isEmpty())
127 conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize());
128 }
129
130 /**
131 * Gets the authorizations to set for the scans from the configuration.
132 *
133 * @param implementingClass
134 * the class whose name will be used as a prefix for the property configuration key
135 * @param conf
136 * the Hadoop configuration object to configure
137 * @return the Accumulo scan authorizations
138 * @since 1.5.0
139 * @see #setScanAuthorizations(Class, Configuration, Authorizations)
140 */
141 public static Authorizations getScanAuthorizations(Class<?> implementingClass, Configuration conf) {
142 String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS));
143 return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
144 }
145
146 /**
147 * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
148 *
149 * @param implementingClass
150 * the class whose name will be used as a prefix for the property configuration key
151 * @param conf
152 * the Hadoop configuration object to configure
153 * @param ranges
154 * the ranges that will be mapped over
155 * @since 1.5.0
156 */
157 public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
158 ArgumentChecker.notNull(ranges);
159 ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
160 try {
161 for (Range r : ranges) {
162 ByteArrayOutputStream baos = new ByteArrayOutputStream();
163 r.write(new DataOutputStream(baos));
164 rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8")));
165 }
166 } catch (IOException ex) {
167 throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
168 }
169 conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
170 }
171
172 /**
173 * Gets the ranges to scan over from a job.
174 *
175 * @param implementingClass
176 * the class whose name will be used as a prefix for the property configuration key
177 * @param conf
178 * the Hadoop configuration object to configure
179 * @return the ranges
180 * @throws IOException
181 * if the ranges have been encoded improperly
182 * @since 1.5.0
183 * @see #setRanges(Class, Configuration, Collection)
184 */
185 public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
186 ArrayList<Range> ranges = new ArrayList<Range>();
187 for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
188 ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Charset.forName("UTF-8"))));
189 Range range = new Range();
190 range.readFields(new DataInputStream(bais));
191 ranges.add(range);
192 }
193 return ranges;
194 }
195
196 /**
197 * Restricts the columns that will be mapped over for this job.
198 *
199 * @param implementingClass
200 * the class whose name will be used as a prefix for the property configuration key
201 * @param conf
202 * the Hadoop configuration object to configure
203 * @param columnFamilyColumnQualifierPairs
204 * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
205 * selected. An empty set is the default and is equivalent to scanning the all columns.
206 * @since 1.5.0
207 */
208 public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
209 ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
210 ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
211 for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
212 if (column.getFirst() == null)
213 throw new IllegalArgumentException("Column family can not be null");
214
215 String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Charset.forName("UTF-8"));
216 if (column.getSecond() != null)
217 col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Charset.forName("UTF-8"));
218 columnStrings.add(col);
219 }
220 conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0]));
221 }
222
223 /**
224 * Gets the columns to be mapped over from this job.
225 *
226 * @param implementingClass
227 * the class whose name will be used as a prefix for the property configuration key
228 * @param conf
229 * the Hadoop configuration object to configure
230 * @return a set of columns
231 * @since 1.5.0
232 * @see #fetchColumns(Class, Configuration, Collection)
233 */
234 public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
235 Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
236 for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
237 int idx = col.indexOf(":");
238 Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes(
239 Charset.forName("UTF-8"))));
240 Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
241 columns.add(new Pair<Text,Text>(cf, cq));
242 }
243 return columns;
244 }
245
246 /**
247 * Encode an iterator on the input for this job.
248 *
249 * @param implementingClass
250 * the class whose name will be used as a prefix for the property configuration key
251 * @param conf
252 * the Hadoop configuration object to configure
253 * @param cfg
254 * the configuration of the iterator
255 * @since 1.5.0
256 */
257 public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
258 ByteArrayOutputStream baos = new ByteArrayOutputStream();
259 String newIter;
260 try {
261 cfg.write(new DataOutputStream(baos));
262 newIter = new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8"));
263 baos.close();
264 } catch (IOException e) {
265 throw new IllegalArgumentException("unable to serialize IteratorSetting");
266 }
267
268 String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
269
270 if (iterators == null || iterators.isEmpty()) {
271 iterators = newIter;
272 } else {
273
274 iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
275 }
276
277 conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
278 }
279
280 /**
281 * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
282 *
283 * @param implementingClass
284 * the class whose name will be used as a prefix for the property configuration key
285 * @param conf
286 * the Hadoop configuration object to configure
287 * @return a list of iterators
288 * @since 1.5.0
289 * @see #addIterator(Class, Configuration, IteratorSetting)
290 */
291 public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
292 String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
293
294
295 if (iterators == null || iterators.isEmpty())
296 return new ArrayList<IteratorSetting>();
297
298
299 StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
300 List<IteratorSetting> list = new ArrayList<IteratorSetting>();
301 try {
302 while (tokens.hasMoreTokens()) {
303 String itstring = tokens.nextToken();
304 ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
305 list.add(new IteratorSetting(new DataInputStream(bais)));
306 bais.close();
307 }
308 } catch (IOException e) {
309 throw new IllegalArgumentException("couldn't decode iterator settings");
310 }
311 return list;
312 }
313
314 /**
315 * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
316 * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
317 *
318 * <p>
319 * By default, this feature is <b>enabled</b>.
320 *
321 * @param implementingClass
322 * the class whose name will be used as a prefix for the property configuration key
323 * @param conf
324 * the Hadoop configuration object to configure
325 * @param enableFeature
326 * the feature is enabled if true, disabled otherwise
327 * @see #setRanges(Class, Configuration, Collection)
328 * @since 1.5.0
329 */
330 public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
331 conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
332 }
333
334 /**
335 * Determines whether a configuration has auto-adjust ranges enabled.
336 *
337 * @param implementingClass
338 * the class whose name will be used as a prefix for the property configuration key
339 * @param conf
340 * the Hadoop configuration object to configure
341 * @return false if the feature is disabled, true otherwise
342 * @since 1.5.0
343 * @see #setAutoAdjustRanges(Class, Configuration, boolean)
344 */
345 public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
346 return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
347 }
348
349 /**
350 * Controls the use of the {@link IsolatedScanner} in this job.
351 *
352 * <p>
353 * By default, this feature is <b>disabled</b>.
354 *
355 * @param implementingClass
356 * the class whose name will be used as a prefix for the property configuration key
357 * @param conf
358 * the Hadoop configuration object to configure
359 * @param enableFeature
360 * the feature is enabled if true, disabled otherwise
361 * @since 1.5.0
362 */
363 public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
364 conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
365 }
366
367 /**
368 * Determines whether a configuration has isolation enabled.
369 *
370 * @param implementingClass
371 * the class whose name will be used as a prefix for the property configuration key
372 * @param conf
373 * the Hadoop configuration object to configure
374 * @return true if the feature is enabled, false otherwise
375 * @since 1.5.0
376 * @see #setScanIsolation(Class, Configuration, boolean)
377 */
378 public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
379 return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
380 }
381
382 /**
383 * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
384 * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
385 *
386 * <p>
387 * By default, this feature is <b>disabled</b>.
388 *
389 * @param implementingClass
390 * the class whose name will be used as a prefix for the property configuration key
391 * @param conf
392 * the Hadoop configuration object to configure
393 * @param enableFeature
394 * the feature is enabled if true, disabled otherwise
395 * @since 1.5.0
396 */
397 public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
398 conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
399 }
400
401 /**
402 * Determines whether a configuration uses local iterators.
403 *
404 * @param implementingClass
405 * the class whose name will be used as a prefix for the property configuration key
406 * @param conf
407 * the Hadoop configuration object to configure
408 * @return true if the feature is enabled, false otherwise
409 * @since 1.5.0
410 * @see #setLocalIterators(Class, Configuration, boolean)
411 */
412 public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
413 return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
414 }
415
416 /**
417 * <p>
418 * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
419 * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
420 * fail.
421 *
422 * <p>
423 * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
424 *
425 * <p>
426 * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
427 * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
428 *
429 * <p>
430 * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
431 * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
432 * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
433 *
434 * <p>
435 * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
436 * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
437 *
438 * <p>
439 * By default, this feature is <b>disabled</b>.
440 *
441 * @param implementingClass
442 * the class whose name will be used as a prefix for the property configuration key
443 * @param conf
444 * the Hadoop configuration object to configure
445 * @param enableFeature
446 * the feature is enabled if true, disabled otherwise
447 * @since 1.5.0
448 */
449 public static void setOfflineTableScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
450 conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature);
451 }
452
453 /**
454 * Determines whether a configuration has the offline table scan feature enabled.
455 *
456 * @param implementingClass
457 * the class whose name will be used as a prefix for the property configuration key
458 * @param conf
459 * the Hadoop configuration object to configure
460 * @return true if the feature is enabled, false otherwise
461 * @since 1.5.0
462 * @see #setOfflineTableScan(Class, Configuration, boolean)
463 */
464 public static Boolean isOfflineScan(Class<?> implementingClass, Configuration conf) {
465 return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
466 }
467
468 /**
469 * Initializes an Accumulo {@link TabletLocator} based on the configuration.
470 *
471 * @param implementingClass
472 * the class whose name will be used as a prefix for the property configuration key
473 * @param conf
474 * the Hadoop configuration object to configure
475 * @return an Accumulo tablet locator
476 * @throws TableNotFoundException
477 * if the table name set on the configuration doesn't exist
478 * @since 1.5.0
479 */
480 public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
481 String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
482 if ("MockInstance".equals(instanceType))
483 return new MockTabletLocator();
484 Instance instance = getInstance(implementingClass, conf);
485 String principal = getPrincipal(implementingClass, conf);
486 String tokenClass = getTokenClass(implementingClass, conf);
487 byte[] token = getToken(implementingClass, conf);
488 String tableName = getInputTableName(implementingClass, conf);
489 return TabletLocator.getInstance(instance, new Credential(principal, tokenClass, ByteBuffer.wrap(token), instance.getInstanceID()),
490 new Text(Tables.getTableId(instance, tableName)));
491 }
492
493
494 /**
495 * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
496 *
497 * @param implementingClass
498 * the class whose name will be used as a prefix for the property configuration key
499 * @param conf
500 * the Hadoop configuration object to configure
501 * @throws IOException
502 * if the context is improperly configured
503 * @since 1.5.0
504 */
505 public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
506 if (!isConnectorInfoSet(implementingClass, conf))
507 throw new IOException("Input info has not been set.");
508 String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
509 if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
510 throw new IOException("Instance info has not been set.");
511
512 try {
513 Connector c = getInstance(implementingClass, conf).getConnector(
514 new Credential(getPrincipal(implementingClass, conf), getTokenClass(implementingClass, conf), ByteBuffer.wrap(getToken(implementingClass, conf)),
515 getInstance(implementingClass, conf).getInstanceID()));
516 if (!c.securityOperations().authenticateUser(getPrincipal(implementingClass, conf), CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf))))
517 throw new IOException("Unable to authenticate user");
518 if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ))
519 throw new IOException("Unable to access table");
520
521 if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
522
523 for (IteratorSetting iter : getIterators(implementingClass, conf)) {
524 if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
525 throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
526 }
527 }
528
529 } catch (AccumuloException e) {
530 throw new IOException(e);
531 } catch (AccumuloSecurityException e) {
532 throw new IOException(e);
533 }
534 }
535
536 }