View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.mapreduce.lib.util;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.nio.charset.Charset;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Set;
31  import java.util.StringTokenizer;
32  
33  import org.apache.accumulo.core.Constants;
34  import org.apache.accumulo.core.client.AccumuloException;
35  import org.apache.accumulo.core.client.AccumuloSecurityException;
36  import org.apache.accumulo.core.client.ClientSideIteratorScanner;
37  import org.apache.accumulo.core.client.Connector;
38  import org.apache.accumulo.core.client.Instance;
39  import org.apache.accumulo.core.client.IsolatedScanner;
40  import org.apache.accumulo.core.client.IteratorSetting;
41  import org.apache.accumulo.core.client.Scanner;
42  import org.apache.accumulo.core.client.TableNotFoundException;
43  import org.apache.accumulo.core.client.impl.Tables;
44  import org.apache.accumulo.core.client.impl.TabletLocator;
45  import org.apache.accumulo.core.client.mock.MockTabletLocator;
46  import org.apache.accumulo.core.data.Range;
47  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
48  import org.apache.accumulo.core.security.Authorizations;
49  import org.apache.accumulo.core.security.CredentialHelper;
50  import org.apache.accumulo.core.security.TablePermission;
51  import org.apache.accumulo.core.security.thrift.Credential;
52  import org.apache.accumulo.core.util.ArgumentChecker;
53  import org.apache.accumulo.core.util.Pair;
54  import org.apache.accumulo.core.util.TextUtil;
55  import org.apache.commons.codec.binary.Base64;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.io.Text;
58  import org.apache.hadoop.util.StringUtils;
59  
60  /**
61   * @since 1.5.0
62   */
63  public class InputConfigurator extends ConfiguratorBase {
64    
65    /**
66     * Configuration keys for {@link Scanner}.
67     * 
68     * @since 1.5.0
69     */
70    public static enum ScanOpts {
71      TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
72    }
73    
74    /**
75     * Configuration keys for various features.
76     * 
77     * @since 1.5.0
78     */
79    public static enum Features {
80      AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
81    }
82    
83    /**
84     * Sets the name of the input table, over which this job will scan.
85     * 
86     * @param implementingClass
87     *          the class whose name will be used as a prefix for the property configuration key
88     * @param conf
89     *          the Hadoop configuration object to configure
90     * @param tableName
91     *          the table to use when the tablename is null in the write call
92     * @since 1.5.0
93     */
94    public static void setInputTableName(Class<?> implementingClass, Configuration conf, String tableName) {
95      ArgumentChecker.notNull(tableName);
96      conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
97    }
98    
99    /**
100    * Gets the table name from the configuration.
101    * 
102    * @param implementingClass
103    *          the class whose name will be used as a prefix for the property configuration key
104    * @param conf
105    *          the Hadoop configuration object to configure
106    * @return the table name
107    * @since 1.5.0
108    * @see #setInputTableName(Class, Configuration, String)
109    */
110   public static String getInputTableName(Class<?> implementingClass, Configuration conf) {
111     return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
112   }
113   
114   /**
115    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
116    * 
117    * @param implementingClass
118    *          the class whose name will be used as a prefix for the property configuration key
119    * @param conf
120    *          the Hadoop configuration object to configure
121    * @param auths
122    *          the user's authorizations
123    * @since 1.5.0
124    */
125   public static void setScanAuthorizations(Class<?> implementingClass, Configuration conf, Authorizations auths) {
126     if (auths != null && !auths.isEmpty())
127       conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), auths.serialize());
128   }
129   
130   /**
131    * Gets the authorizations to set for the scans from the configuration.
132    * 
133    * @param implementingClass
134    *          the class whose name will be used as a prefix for the property configuration key
135    * @param conf
136    *          the Hadoop configuration object to configure
137    * @return the Accumulo scan authorizations
138    * @since 1.5.0
139    * @see #setScanAuthorizations(Class, Configuration, Authorizations)
140    */
141   public static Authorizations getScanAuthorizations(Class<?> implementingClass, Configuration conf) {
142     String authString = conf.get(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS));
143     return authString == null ? Constants.NO_AUTHS : new Authorizations(authString.getBytes());
144   }
145   
146   /**
147    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
148    * 
149    * @param implementingClass
150    *          the class whose name will be used as a prefix for the property configuration key
151    * @param conf
152    *          the Hadoop configuration object to configure
153    * @param ranges
154    *          the ranges that will be mapped over
155    * @since 1.5.0
156    */
157   public static void setRanges(Class<?> implementingClass, Configuration conf, Collection<Range> ranges) {
158     ArgumentChecker.notNull(ranges);
159     ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
160     try {
161       for (Range r : ranges) {
162         ByteArrayOutputStream baos = new ByteArrayOutputStream();
163         r.write(new DataOutputStream(baos));
164         rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8")));
165       }
166     } catch (IOException ex) {
167       throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
168     }
169     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), rangeStrings.toArray(new String[0]));
170   }
171   
172   /**
173    * Gets the ranges to scan over from a job.
174    * 
175    * @param implementingClass
176    *          the class whose name will be used as a prefix for the property configuration key
177    * @param conf
178    *          the Hadoop configuration object to configure
179    * @return the ranges
180    * @throws IOException
181    *           if the ranges have been encoded improperly
182    * @since 1.5.0
183    * @see #setRanges(Class, Configuration, Collection)
184    */
185   public static List<Range> getRanges(Class<?> implementingClass, Configuration conf) throws IOException {
186     ArrayList<Range> ranges = new ArrayList<Range>();
187     for (String rangeString : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
188       ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Charset.forName("UTF-8"))));
189       Range range = new Range();
190       range.readFields(new DataInputStream(bais));
191       ranges.add(range);
192     }
193     return ranges;
194   }
195   
196   /**
197    * Restricts the columns that will be mapped over for this job.
198    * 
199    * @param implementingClass
200    *          the class whose name will be used as a prefix for the property configuration key
201    * @param conf
202    *          the Hadoop configuration object to configure
203    * @param columnFamilyColumnQualifierPairs
204    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
205    *          selected. An empty set is the default and is equivalent to scanning the all columns.
206    * @since 1.5.0
207    */
208   public static void fetchColumns(Class<?> implementingClass, Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
209     ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
210     ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size());
211     for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
212       if (column.getFirst() == null)
213         throw new IllegalArgumentException("Column family can not be null");
214       
215       String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), Charset.forName("UTF-8"));
216       if (column.getSecond() != null)
217         col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), Charset.forName("UTF-8"));
218       columnStrings.add(col);
219     }
220     conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), columnStrings.toArray(new String[0]));
221   }
222   
223   /**
224    * Gets the columns to be mapped over from this job.
225    * 
226    * @param implementingClass
227    *          the class whose name will be used as a prefix for the property configuration key
228    * @param conf
229    *          the Hadoop configuration object to configure
230    * @return a set of columns
231    * @since 1.5.0
232    * @see #fetchColumns(Class, Configuration, Collection)
233    */
234   public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> implementingClass, Configuration conf) {
235     Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
236     for (String col : conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
237       int idx = col.indexOf(":");
238       Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : Base64.decodeBase64(col.substring(0, idx).getBytes(
239           Charset.forName("UTF-8"))));
240       Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
241       columns.add(new Pair<Text,Text>(cf, cq));
242     }
243     return columns;
244   }
245   
246   /**
247    * Encode an iterator on the input for this job.
248    * 
249    * @param implementingClass
250    *          the class whose name will be used as a prefix for the property configuration key
251    * @param conf
252    *          the Hadoop configuration object to configure
253    * @param cfg
254    *          the configuration of the iterator
255    * @since 1.5.0
256    */
257   public static void addIterator(Class<?> implementingClass, Configuration conf, IteratorSetting cfg) {
258     ByteArrayOutputStream baos = new ByteArrayOutputStream();
259     String newIter;
260     try {
261       cfg.write(new DataOutputStream(baos));
262       newIter = new String(Base64.encodeBase64(baos.toByteArray()), Charset.forName("UTF-8"));
263       baos.close();
264     } catch (IOException e) {
265       throw new IllegalArgumentException("unable to serialize IteratorSetting");
266     }
267     
268     String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
269     // No iterators specified yet, create a new string
270     if (iterators == null || iterators.isEmpty()) {
271       iterators = newIter;
272     } else {
273       // append the next iterator & reset
274       iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
275     }
276     // Store the iterators w/ the job
277     conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
278   }
279   
280   /**
281    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
282    * 
283    * @param implementingClass
284    *          the class whose name will be used as a prefix for the property configuration key
285    * @param conf
286    *          the Hadoop configuration object to configure
287    * @return a list of iterators
288    * @since 1.5.0
289    * @see #addIterator(Class, Configuration, IteratorSetting)
290    */
291   public static List<IteratorSetting> getIterators(Class<?> implementingClass, Configuration conf) {
292     String iterators = conf.get(enumToConfKey(implementingClass, ScanOpts.ITERATORS));
293     
294     // If no iterators are present, return an empty list
295     if (iterators == null || iterators.isEmpty())
296       return new ArrayList<IteratorSetting>();
297     
298     // Compose the set of iterators encoded in the job configuration
299     StringTokenizer tokens = new StringTokenizer(iterators, StringUtils.COMMA_STR);
300     List<IteratorSetting> list = new ArrayList<IteratorSetting>();
301     try {
302       while (tokens.hasMoreTokens()) {
303         String itstring = tokens.nextToken();
304         ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
305         list.add(new IteratorSetting(new DataInputStream(bais)));
306         bais.close();
307       }
308     } catch (IOException e) {
309       throw new IllegalArgumentException("couldn't decode iterator settings");
310     }
311     return list;
312   }
313   
314   /**
315    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
316    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
317    * 
318    * <p>
319    * By default, this feature is <b>enabled</b>.
320    * 
321    * @param implementingClass
322    *          the class whose name will be used as a prefix for the property configuration key
323    * @param conf
324    *          the Hadoop configuration object to configure
325    * @param enableFeature
326    *          the feature is enabled if true, disabled otherwise
327    * @see #setRanges(Class, Configuration, Collection)
328    * @since 1.5.0
329    */
330   public static void setAutoAdjustRanges(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
331     conf.setBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), enableFeature);
332   }
333   
334   /**
335    * Determines whether a configuration has auto-adjust ranges enabled.
336    * 
337    * @param implementingClass
338    *          the class whose name will be used as a prefix for the property configuration key
339    * @param conf
340    *          the Hadoop configuration object to configure
341    * @return false if the feature is disabled, true otherwise
342    * @since 1.5.0
343    * @see #setAutoAdjustRanges(Class, Configuration, boolean)
344    */
345   public static Boolean getAutoAdjustRanges(Class<?> implementingClass, Configuration conf) {
346     return conf.getBoolean(enumToConfKey(implementingClass, Features.AUTO_ADJUST_RANGES), true);
347   }
348   
349   /**
350    * Controls the use of the {@link IsolatedScanner} in this job.
351    * 
352    * <p>
353    * By default, this feature is <b>disabled</b>.
354    * 
355    * @param implementingClass
356    *          the class whose name will be used as a prefix for the property configuration key
357    * @param conf
358    *          the Hadoop configuration object to configure
359    * @param enableFeature
360    *          the feature is enabled if true, disabled otherwise
361    * @since 1.5.0
362    */
363   public static void setScanIsolation(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
364     conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), enableFeature);
365   }
366   
367   /**
368    * Determines whether a configuration has isolation enabled.
369    * 
370    * @param implementingClass
371    *          the class whose name will be used as a prefix for the property configuration key
372    * @param conf
373    *          the Hadoop configuration object to configure
374    * @return true if the feature is enabled, false otherwise
375    * @since 1.5.0
376    * @see #setScanIsolation(Class, Configuration, boolean)
377    */
378   public static Boolean isIsolated(Class<?> implementingClass, Configuration conf) {
379     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), false);
380   }
381   
382   /**
383    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
384    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
385    * 
386    * <p>
387    * By default, this feature is <b>disabled</b>.
388    * 
389    * @param implementingClass
390    *          the class whose name will be used as a prefix for the property configuration key
391    * @param conf
392    *          the Hadoop configuration object to configure
393    * @param enableFeature
394    *          the feature is enabled if true, disabled otherwise
395    * @since 1.5.0
396    */
397   public static void setLocalIterators(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
398     conf.setBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), enableFeature);
399   }
400   
401   /**
402    * Determines whether a configuration uses local iterators.
403    * 
404    * @param implementingClass
405    *          the class whose name will be used as a prefix for the property configuration key
406    * @param conf
407    *          the Hadoop configuration object to configure
408    * @return true if the feature is enabled, false otherwise
409    * @since 1.5.0
410    * @see #setLocalIterators(Class, Configuration, boolean)
411    */
412   public static Boolean usesLocalIterators(Class<?> implementingClass, Configuration conf) {
413     return conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false);
414   }
415   
416   /**
417    * <p>
418    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
419    * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
420    * fail.
421    * 
422    * <p>
423    * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
424    * 
425    * <p>
426    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
427    * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
428    * 
429    * <p>
430    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
431    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
432    * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
433    * 
434    * <p>
435    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
436    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
437    * 
438    * <p>
439    * By default, this feature is <b>disabled</b>.
440    * 
441    * @param implementingClass
442    *          the class whose name will be used as a prefix for the property configuration key
443    * @param conf
444    *          the Hadoop configuration object to configure
445    * @param enableFeature
446    *          the feature is enabled if true, disabled otherwise
447    * @since 1.5.0
448    */
449   public static void setOfflineTableScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) {
450     conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), enableFeature);
451   }
452   
453   /**
454    * Determines whether a configuration has the offline table scan feature enabled.
455    * 
456    * @param implementingClass
457    *          the class whose name will be used as a prefix for the property configuration key
458    * @param conf
459    *          the Hadoop configuration object to configure
460    * @return true if the feature is enabled, false otherwise
461    * @since 1.5.0
462    * @see #setOfflineTableScan(Class, Configuration, boolean)
463    */
464   public static Boolean isOfflineScan(Class<?> implementingClass, Configuration conf) {
465     return conf.getBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), false);
466   }
467   
468   /**
469    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
470    * 
471    * @param implementingClass
472    *          the class whose name will be used as a prefix for the property configuration key
473    * @param conf
474    *          the Hadoop configuration object to configure
475    * @return an Accumulo tablet locator
476    * @throws TableNotFoundException
477    *           if the table name set on the configuration doesn't exist
478    * @since 1.5.0
479    */
480   public static TabletLocator getTabletLocator(Class<?> implementingClass, Configuration conf) throws TableNotFoundException {
481     String instanceType = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
482     if ("MockInstance".equals(instanceType))
483       return new MockTabletLocator();
484     Instance instance = getInstance(implementingClass, conf);
485     String principal = getPrincipal(implementingClass, conf);
486     String tokenClass = getTokenClass(implementingClass, conf);
487     byte[] token = getToken(implementingClass, conf);
488     String tableName = getInputTableName(implementingClass, conf);
489     return TabletLocator.getInstance(instance, new Credential(principal, tokenClass, ByteBuffer.wrap(token), instance.getInstanceID()),
490         new Text(Tables.getTableId(instance, tableName)));
491   }
492   
493   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
494   /**
495    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
496    * 
497    * @param implementingClass
498    *          the class whose name will be used as a prefix for the property configuration key
499    * @param conf
500    *          the Hadoop configuration object to configure
501    * @throws IOException
502    *           if the context is improperly configured
503    * @since 1.5.0
504    */
505   public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
506     if (!isConnectorInfoSet(implementingClass, conf))
507       throw new IOException("Input info has not been set.");
508     String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
509     if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
510       throw new IOException("Instance info has not been set.");
511     // validate that we can connect as configured
512     try {
513       Connector c = getInstance(implementingClass, conf).getConnector(
514           new Credential(getPrincipal(implementingClass, conf), getTokenClass(implementingClass, conf), ByteBuffer.wrap(getToken(implementingClass, conf)),
515               getInstance(implementingClass, conf).getInstanceID()));
516       if (!c.securityOperations().authenticateUser(getPrincipal(implementingClass, conf), CredentialHelper.extractToken(getTokenClass(implementingClass, conf), getToken(implementingClass, conf))))
517         throw new IOException("Unable to authenticate user");
518       if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), getInputTableName(implementingClass, conf), TablePermission.READ))
519         throw new IOException("Unable to access table");
520       
521       if (!conf.getBoolean(enumToConfKey(implementingClass, Features.USE_LOCAL_ITERATORS), false)) {
522         // validate that any scan-time iterators can be loaded by the the tablet servers
523         for (IteratorSetting iter : getIterators(implementingClass, conf)) {
524           if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), SortedKeyValueIterator.class.getName()))
525             throw new AccumuloException("Servers are unable to load " + iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
526         }
527       }
528       
529     } catch (AccumuloException e) {
530       throw new IOException(e);
531     } catch (AccumuloSecurityException e) {
532       throw new IOException(e);
533     }
534   }
535   
536 }