View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.mapreduce;
19  import;
20  import;
21  import;
22  import;
23  import java.math.BigInteger;
24  import;
25  import;
26  import;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.Set;
36  import java.util.StringTokenizer;
38  import org.apache.accumulo.core.Constants;
39  import org.apache.accumulo.core.client.AccumuloException;
40  import org.apache.accumulo.core.client.AccumuloSecurityException;
41  import org.apache.accumulo.core.client.ClientSideIteratorScanner;
42  import org.apache.accumulo.core.client.Connector;
43  import org.apache.accumulo.core.client.Instance;
44  import org.apache.accumulo.core.client.IsolatedScanner;
45  import org.apache.accumulo.core.client.IteratorSetting;
46  import org.apache.accumulo.core.client.RowIterator;
47  import org.apache.accumulo.core.client.Scanner;
48  import org.apache.accumulo.core.client.TableDeletedException;
49  import org.apache.accumulo.core.client.TableNotFoundException;
50  import org.apache.accumulo.core.client.TableOfflineException;
51  import org.apache.accumulo.core.client.ZooKeeperInstance;
52  import org.apache.accumulo.core.client.impl.OfflineScanner;
53  import org.apache.accumulo.core.client.impl.Tables;
54  import org.apache.accumulo.core.client.impl.TabletLocator;
55  import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
56  import org.apache.accumulo.core.client.mock.MockInstance;
57  import;
58  import;
59  import;
60  import;
61  import;
62  import;
63  import org.apache.accumulo.core.iterators.user.VersioningIterator;
64  import org.apache.accumulo.core.master.state.tables.TableState;
65  import;
66  import;
67  import;
68  import;
69  import;
70  import org.apache.accumulo.core.util.Pair;
71  import org.apache.accumulo.core.util.UtilWaitThread;
72  import org.apache.hadoop.conf.Configuration;
73  import org.apache.hadoop.filecache.DistributedCache;
74  import org.apache.hadoop.fs.Path;
75  import;
76  import;
77  import org.apache.hadoop.mapreduce.InputFormat;
78  import org.apache.hadoop.mapreduce.InputSplit;
79  import org.apache.hadoop.mapreduce.Job;
80  import org.apache.hadoop.mapreduce.JobContext;
81  import org.apache.hadoop.mapreduce.RecordReader;
82  import org.apache.hadoop.mapreduce.TaskAttemptContext;
83  import org.apache.log4j.Level;
84  import org.apache.log4j.Logger;
86  /**
87   * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
88   * <p>
89   * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
90   * <p>
91   * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
92   * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V.
93   * <p>
94   * See {@link AccumuloInputFormat} for an example implementation.
95   */
96  public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
98    private static final Class<?> CLASS = AccumuloInputFormat.class;
99    protected static final Logger log = Logger.getLogger(CLASS);
101   /**
102    * Sets the connector information needed to communicate with Accumulo in this job.
103    * 
104    * <p>
105    * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
106    * conversion to a string, and is not intended to be secure.
107    * 
108    * @param job
109    *          the Hadoop job instance to be configured
110    * @param principal
111    *          a valid Accumulo user name (user must have Table.CREATE permission)
112    * @param token
113    *          the user's password
114    * @throws AccumuloSecurityException 
115    * @since 1.5.0
116    */
117   public static void setConnectorInfo(Job job, String principal, SecurityToken token) throws AccumuloSecurityException {
118     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
119   }
121   /**
122    * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when
123    * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the
124    * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using
125    * {@link TokenHelper#asBase64String(AccumuloToken)}.
126    * 
127    * @param job
128    *          the Hadoop job instance to be configured
129    * @param path
130    *          the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
131    * @since 1.5.0
132    */
133   public static void setConnectorInfo(Job job, Path path) {
134     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path);
135   }
137   /**
138    * Determines if the connector has been configured.
139    * 
140    * @param context
141    *          the Hadoop context for the configured job
142    * @return true if the connector has been configured, false otherwise
143    * @since 1.5.0
144    * @see #setConnectorInfo(Job, String, byte[])
145    * @see #setConnectorInfo(Job, Path)
146    */
147   protected static Boolean isConnectorInfoSet(JobContext context) {
148     return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
149   }
151   /**
152    * Gets the user name from the configuration.
153    * 
154    * @param context
155    *          the Hadoop context for the configured job
156    * @return the user name
157    * @since 1.5.0
158    * @see #setConnectorInfo(Job, String, SecurityToken)
159    * @see #setConnectorInfo(Job, Path)
160    */
161   protected static String getPrincipal(JobContext context) {
162     return InputConfigurator.getPrincipal(CLASS, context.getConfiguration());
163   }
165   /**
166    * Gets the serialized token class from the configuration.
167    * 
168    * @param context
169    *          the Hadoop context for the configured job
170    * @return the user name
171    * @since 1.5.0
172    * @see #setConnectorInfo(Job, String, SecurityToken)
173    * @see #setConnectorInfo(Job, Path)
174    */
175   protected static String getTokenClass(JobContext context) {
176     return InputConfigurator.getTokenClass(CLASS, context.getConfiguration());
177   }
179   /**
180    * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
181    * provide a charset safe conversion to a string, and is not intended to be secure.
182    * 
183    * @param context
184    *          the Hadoop context for the configured job
185    * @return the decoded user password
186    * @since 1.5.0
187    * @see #setConnectorInfo(Job, String, byte[])
188    */
189   protected static byte[] getToken(JobContext context) {
190     return InputConfigurator.getToken(CLASS, context.getConfiguration());
191   }
193   /**
194    * Configures a {@link ZooKeeperInstance} for this job.
195    * 
196    * @param job
197    *          the Hadoop job instance to be configured
198    * @param instanceName
199    *          the Accumulo instance name
200    * @param zooKeepers
201    *          a comma-separated list of zookeeper servers
202    * @since 1.5.0
203    */
204   public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
205     InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
206   }
208   /**
209    * Configures a {@link MockInstance} for this job.
210    * 
211    * @param job
212    *          the Hadoop job instance to be configured
213    * @param instanceName
214    *          the Accumulo instance name
215    * @since 1.5.0
216    */
217   public static void setMockInstance(Job job, String instanceName) {
218     InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
219   }
221   /**
222    * Initializes an Accumulo {@link Instance} based on the configuration.
223    * 
224    * @param context
225    *          the Hadoop context for the configured job
226    * @return an Accumulo instance
227    * @since 1.5.0
228    * @see #setZooKeeperInstance(Job, String, String)
229    * @see #setMockInstance(Job, String)
230    */
231   protected static Instance getInstance(JobContext context) {
232     return InputConfigurator.getInstance(CLASS, context.getConfiguration());
233   }
235   /**
236    * Sets the log level for this job.
237    * 
238    * @param job
239    *          the Hadoop job instance to be configured
240    * @param level
241    *          the logging level
242    * @since 1.5.0
243    */
244   public static void setLogLevel(Job job, Level level) {
245     InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
246   }
248   /**
249    * Gets the log level from this configuration.
250    * 
251    * @param context
252    *          the Hadoop context for the configured job
253    * @return the log level
254    * @since 1.5.0
255    * @see #setLogLevel(Job, Level)
256    */
257   protected static Level getLogLevel(JobContext context) {
258     return InputConfigurator.getLogLevel(CLASS, context.getConfiguration());
259   }
261   /**
262    * Sets the name of the input table, over which this job will scan.
263    * 
264    * @param job
265    *          the Hadoop job instance to be configured
266    * @param tableName
267    *          the table to use when the tablename is null in the write call
268    * @since 1.5.0
269    */
270   public static void setInputTableName(Job job, String tableName) {
271     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
272   }
274   /**
275    * Gets the table name from the configuration.
276    * 
277    * @param context
278    *          the Hadoop context for the configured job
279    * @return the table name
280    * @since 1.5.0
281    * @see #setInputTableName(Job, String)
282    */
283   protected static String getInputTableName(JobContext context) {
284     return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
285   }
287   /**
288    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
289    * 
290    * @param job
291    *          the Hadoop job instance to be configured
292    * @param auths
293    *          the user's authorizations
294    * @since 1.5.0
295    */
296   public static void setScanAuthorizations(Job job, Authorizations auths) {
297     InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
298   }
300   /**
301    * Gets the authorizations to set for the scans from the configuration.
302    * 
303    * @param context
304    *          the Hadoop context for the configured job
305    * @return the Accumulo scan authorizations
306    * @since 1.5.0
307    * @see #setScanAuthorizations(Job, Authorizations)
308    */
309   protected static Authorizations getScanAuthorizations(JobContext context) {
310     return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
311   }
313   /**
314    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
315    * 
316    * @param job
317    *          the Hadoop job instance to be configured
318    * @param ranges
319    *          the ranges that will be mapped over
320    * @since 1.5.0
321    */
322   public static void setRanges(Job job, Collection<Range> ranges) {
323     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
324   }
326   /**
327    * Gets the ranges to scan over from a job.
328    * 
329    * @param context
330    *          the Hadoop context for the configured job
331    * @return the ranges
332    * @throws IOException
333    *           if the ranges have been encoded improperly
334    * @since 1.5.0
335    * @see #setRanges(Job, Collection)
336    */
337   protected static List<Range> getRanges(JobContext context) throws IOException {
338     return InputConfigurator.getRanges(CLASS, context.getConfiguration());
339   }
341   /**
342    * Restricts the columns that will be mapped over for this job.
343    * 
344    * @param job
345    *          the Hadoop job instance to be configured
346    * @param columnFamilyColumnQualifierPairs
347    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
348    *          selected. An empty set is the default and is equivalent to scanning the all columns.
349    * @since 1.5.0
350    */
351   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
352     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
353   }
355   /**
356    * Gets the columns to be mapped over from this job.
357    * 
358    * @param context
359    *          the Hadoop context for the configured job
360    * @return a set of columns
361    * @since 1.5.0
362    * @see #fetchColumns(Job, Collection)
363    */
364   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
365     return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
366   }
368   /**
369    * Encode an iterator on the input for this job.
370    * 
371    * @param job
372    *          the Hadoop job instance to be configured
373    * @param cfg
374    *          the configuration of the iterator
375    * @since 1.5.0
376    */
377   public static void addIterator(Job job, IteratorSetting cfg) {
378     InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
379   }
381   /**
382    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
383    * 
384    * @param context
385    *          the Hadoop context for the configured job
386    * @return a list of iterators
387    * @since 1.5.0
388    * @see #addIterator(Job, IteratorSetting)
389    */
390   protected static List<IteratorSetting> getIterators(JobContext context) {
391     return InputConfigurator.getIterators(CLASS, context.getConfiguration());
392   }
394   /**
395    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
396    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
397    * 
398    * <p>
399    * By default, this feature is <b>enabled</b>.
400    * 
401    * @param job
402    *          the Hadoop job instance to be configured
403    * @param enableFeature
404    *          the feature is enabled if true, disabled otherwise
405    * @see #setRanges(Job, Collection)
406    * @since 1.5.0
407    */
408   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
409     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
410   }
412   /**
413    * Determines whether a configuration has auto-adjust ranges enabled.
414    * 
415    * @param context
416    *          the Hadoop context for the configured job
417    * @return false if the feature is disabled, true otherwise
418    * @since 1.5.0
419    * @see #setAutoAdjustRanges(Job, boolean)
420    */
421   protected static boolean getAutoAdjustRanges(JobContext context) {
422     return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
423   }
425   /**
426    * Controls the use of the {@link IsolatedScanner} in this job.
427    * 
428    * <p>
429    * By default, this feature is <b>disabled</b>.
430    * 
431    * @param job
432    *          the Hadoop job instance to be configured
433    * @param enableFeature
434    *          the feature is enabled if true, disabled otherwise
435    * @since 1.5.0
436    */
437   public static void setScanIsolation(Job job, boolean enableFeature) {
438     InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
439   }
441   /**
442    * Determines whether a configuration has isolation enabled.
443    * 
444    * @param context
445    *          the Hadoop context for the configured job
446    * @return true if the feature is enabled, false otherwise
447    * @since 1.5.0
448    * @see #setScanIsolation(Job, boolean)
449    */
450   protected static boolean isIsolated(JobContext context) {
451     return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
452   }
454   /**
455    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
456    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
457    * 
458    * <p>
459    * By default, this feature is <b>disabled</b>.
460    * 
461    * @param job
462    *          the Hadoop job instance to be configured
463    * @param enableFeature
464    *          the feature is enabled if true, disabled otherwise
465    * @since 1.5.0
466    */
467   public static void setLocalIterators(Job job, boolean enableFeature) {
468     InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
469   }
471   /**
472    * Determines whether a configuration uses local iterators.
473    * 
474    * @param context
475    *          the Hadoop context for the configured job
476    * @return true if the feature is enabled, false otherwise
477    * @since 1.5.0
478    * @see #setLocalIterators(Job, boolean)
479    */
480   protected static boolean usesLocalIterators(JobContext context) {
481     return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
482   }
484   /**
485    * <p>
486    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
487    * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
488    * fail.
489    * 
490    * <p>
491    * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
492    * 
493    * <p>
494    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
495    * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
496    * 
497    * <p>
498    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
499    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
500    * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
501    * 
502    * <p>
503    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
504    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
505    * 
506    * <p>
507    * By default, this feature is <b>disabled</b>.
508    * 
509    * @param job
510    *          the Hadoop job instance to be configured
511    * @param enableFeature
512    *          the feature is enabled if true, disabled otherwise
513    * @since 1.5.0
514    */
515   public static void setOfflineTableScan(Job job, boolean enableFeature) {
516     InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
517   }
519   /**
520    * Determines whether a configuration has the offline table scan feature enabled.
521    * 
522    * @param context
523    *          the Hadoop context for the configured job
524    * @return true if the feature is enabled, false otherwise
525    * @since 1.5.0
526    * @see #setOfflineTableScan(Job, boolean)
527    */
528   protected static boolean isOfflineScan(JobContext context) {
529     return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
530   }
532   /**
533    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
534    * 
535    * @param context
536    *          the Hadoop context for the configured job
537    * @return an Accumulo tablet locator
538    * @throws TableNotFoundException
539    *           if the table name set on the configuration doesn't exist
540    * @since 1.5.0
541    */
542   protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
543     return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration());
544   }
546   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
547   /**
548    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
549    * 
550    * @param context
551    *          the Hadoop context for the configured job
552    * @throws IOException
553    *           if the context is improperly configured
554    * @since 1.5.0
555    */
556   protected static void validateOptions(JobContext context) throws IOException {
557     InputConfigurator.validateOptions(CLASS, context.getConfiguration());
558   }
560   /**
561    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
562    * types.
563    * 
564    * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
565    * <ul>
566    * <li>K {@link #currentK}</li>
567    * <li>V {@link #currentV}</li>
568    * <li>Key {@link #currentKey} (used for progress reporting)</li>
569    * <li>int {@link #numKeysRead} (used for progress reporting)</li>
570    * </ul>
571    */
572   protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
573     protected long numKeysRead;
574     protected Iterator<Entry<Key,Value>> scannerIterator;
575     protected RangeInputSplit split;
577     /**
578      * Apply the configured iterators from the configuration to the scanner.
579      * 
580      * @param context
581      *          the Hadoop context for the configured job
582      * @param scanner
583      *          the scanner to configure
584      */
585     protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
586       List<IteratorSetting> iterators = getIterators(context);
587       for (IteratorSetting iterator : iterators) {
588         scanner.addScanIterator(iterator);
589       }
590     }
592     /**
593      * Initialize a scanner over the given input split using this task attempt configuration.
594      */
595     @Override
596     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
597       Scanner scanner;
598       split = (RangeInputSplit) inSplit;
599       log.debug("Initializing input split: " + split.range);
600       Instance instance = getInstance(attempt);
601       String principal = getPrincipal(attempt);
602       String tokenClass = getTokenClass(attempt);
603       byte[] token = getToken(attempt);
604       Authorizations authorizations = getScanAuthorizations(attempt);
606       try {
607         log.debug("Creating connector with user: " + principal);
608         Connector conn = instance.getConnector(principal, CredentialHelper.extractToken(tokenClass, token));
609         log.debug("Creating scanner for table: " + getInputTableName(attempt));
610         log.debug("Authorizations are: " + authorizations);
611         if (isOfflineScan(attempt)) {
612           scanner = new OfflineScanner(instance, new Credential(principal, tokenClass, ByteBuffer.wrap(token), instance.getInstanceID()), Tables.getTableId(instance,
613               getInputTableName(attempt)), authorizations);
614         } else {
615           scanner = conn.createScanner(getInputTableName(attempt), authorizations);
616         }
617         if (isIsolated(attempt)) {
618 "Creating isolated scanner");
619           scanner = new IsolatedScanner(scanner);
620         }
621         if (usesLocalIterators(attempt)) {
622 "Using local iterators");
623           scanner = new ClientSideIteratorScanner(scanner);
624         }
625         setupIterators(attempt, scanner);
626       } catch (Exception e) {
627         throw new IOException(e);
628       }
630       // setup a scanner within the bounds of this split
631       for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
632         if (c.getSecond() != null) {
633           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
634           scanner.fetchColumn(c.getFirst(), c.getSecond());
635         } else {
636           log.debug("Fetching column family " + c.getFirst());
637           scanner.fetchColumnFamily(c.getFirst());
638         }
639       }
641       scanner.setRange(split.range);
643       numKeysRead = 0;
645       // do this last after setting all scanner options
646       scannerIterator = scanner.iterator();
647     }
649     @Override
650     public void close() {}
652     @Override
653     public float getProgress() throws IOException {
654       if (numKeysRead > 0 && currentKey == null)
655         return 1.0f;
656       return split.getProgress(currentKey);
657     }
659     protected K currentK = null;
660     protected V currentV = null;
661     protected Key currentKey = null;
662     protected Value currentValue = null;
664     @Override
665     public K getCurrentKey() throws IOException, InterruptedException {
666       return currentK;
667     }
669     @Override
670     public V getCurrentValue() throws IOException, InterruptedException {
671       return currentV;
672     }
673   }
675   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
676       AccumuloException, AccumuloSecurityException {
678     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
680     Instance instance = getInstance(context);
681     Connector conn = instance.getConnector(getPrincipal(context), getToken(context));
682     String tableId = Tables.getTableId(instance, tableName);
684     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
685       Tables.clearCache(instance);
686       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
687         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
688       }
689     }
691     for (Range range : ranges) {
692       Text startRow;
694       if (range.getStartKey() != null)
695         startRow = range.getStartKey().getRow();
696       else
697         startRow = new Text();
699       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
700       Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
701       Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
702       scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
703       scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
704       scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
705       scanner.setRange(metadataRange);
707       RowIterator rowIter = new RowIterator(scanner);
709       // TODO check that extents match prev extent
711       KeyExtent lastExtent = null;
713       while (rowIter.hasNext()) {
714         Iterator<Entry<Key,Value>> row =;
715         String last = "";
716         KeyExtent extent = null;
717         String location = null;
719         while (row.hasNext()) {
720           Entry<Key,Value> entry =;
721           Key key = entry.getKey();
723           if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
724             last = entry.getValue().toString();
725           }
727           if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
728               || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
729             location = entry.getValue().toString();
730           }
732           if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
733             extent = new KeyExtent(key.getRow(), entry.getValue());
734           }
736         }
738         if (location != null)
739           return null;
741         if (!extent.getTableId().toString().equals(tableId)) {
742           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
743         }
745         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
746           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
747         }
749         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
750         if (tabletRanges == null) {
751           tabletRanges = new HashMap<KeyExtent,List<Range>>();
752           binnedRanges.put(last, tabletRanges);
753         }
755         List<Range> rangeList = tabletRanges.get(extent);
756         if (rangeList == null) {
757           rangeList = new ArrayList<Range>();
758           tabletRanges.put(extent, rangeList);
759         }
761         rangeList.add(range);
763         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
764           break;
765         }
767         lastExtent = extent;
768       }
770     }
772     return binnedRanges;
773   }
775   /**
776    * Read the metadata table to get tablets and match up ranges to them.
777    */
778   @Override
779   public List<InputSplit> getSplits(JobContext context) throws IOException {
780     log.setLevel(getLogLevel(context));
781     validateOptions(context);
783     String tableName = getInputTableName(context);
784     boolean autoAdjust = getAutoAdjustRanges(context);
785     List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
787     if (ranges.isEmpty()) {
788       ranges = new ArrayList<Range>(1);
789       ranges.add(new Range());
790     }
792     // get the metadata information for these ranges
793     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
794     TabletLocator tl;
795     try {
796       if (isOfflineScan(context)) {
797         binnedRanges = binOfflineTable(context, tableName, ranges);
798         while (binnedRanges == null) {
799           // Some tablets were still online, try again
800           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
801           binnedRanges = binOfflineTable(context, tableName, ranges);
802         }
803       } else {
804         Instance instance = getInstance(context);
805         String tableId = null;
806         tl = getTabletLocator(context);
807         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
808         tl.invalidateCache();
809         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
810           if (!(instance instanceof MockInstance)) {
811             if (tableId == null)
812               tableId = Tables.getTableId(instance, tableName);
813             if (!Tables.exists(instance, tableId))
814               throw new TableDeletedException(tableId);
815             if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
816               throw new TableOfflineException(instance, tableId);
817           }
818           binnedRanges.clear();
819           log.warn("Unable to locate bins for specified ranges. Retrying.");
820           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
821           tl.invalidateCache();
822         }
823       }
824     } catch (Exception e) {
825       throw new IOException(e);
826     }
828     ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
829     HashMap<Range,ArrayList<String>> splitsToAdd = null;
831     if (!autoAdjust)
832       splitsToAdd = new HashMap<Range,ArrayList<String>>();
834     HashMap<String,String> hostNameCache = new HashMap<String,String>();
836     for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
837       String ip = tserverBin.getKey().split(":", 2)[0];
838       String location = hostNameCache.get(ip);
839       if (location == null) {
840         InetAddress inetAddress = InetAddress.getByName(ip);
841         location = inetAddress.getHostName();
842         hostNameCache.put(ip, location);
843       }
845       for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
846         Range ke = extentRanges.getKey().toDataRange();
847         for (Range r : extentRanges.getValue()) {
848           if (autoAdjust) {
849             // divide ranges into smaller ranges, based on the tablets
850             splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
851           } else {
852             // don't divide ranges
853             ArrayList<String> locations = splitsToAdd.get(r);
854             if (locations == null)
855               locations = new ArrayList<String>(1);
856             locations.add(location);
857             splitsToAdd.put(r, locations);
858           }
859         }
860       }
861     }
863     if (!autoAdjust)
864       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
865         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
866     return splits;
867   }
869   /**
870    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
871    */
872   public static class RangeInputSplit extends InputSplit implements Writable {
873     private Range range;
874     private String[] locations;
876     public RangeInputSplit() {
877       range = new Range();
878       locations = new String[0];
879     }
881     public RangeInputSplit(RangeInputSplit split) throws IOException {
882       this.setRange(split.getRange());
883       this.setLocations(split.getLocations());
884     }
886     protected RangeInputSplit(String table, Range range, String[] locations) {
887       this.range = range;
888       this.locations = locations;
889     }
891     public Range getRange() {
892       return range;
893     }
895     public void setRange(Range range) {
896       this.range = range;
897     }
899     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
900       byte[] bytes = new byte[numBytes + 1];
901       bytes[0] = 0;
902       for (int i = 0; i < numBytes; i++) {
903         if (i >= seq.length())
904           bytes[i + 1] = 0;
905         else
906           bytes[i + 1] = seq.byteAt(i);
907       }
908       return bytes;
909     }
911     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
912       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
913       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
914       BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
915       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
916       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
917     }
919     public float getProgress(Key currentKey) {
920       if (currentKey == null)
921         return 0f;
922       if (range.getStartKey() != null && range.getEndKey() != null) {
923         if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW)) {
924           // just look at the row progress
925           return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
926         } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM)) {
927           // just look at the column family progress
928           return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
929         } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
930           // just look at the column qualifier progress
931           return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
932         }
933       }
934       // if we can't figure it out, then claim no progress
935       return 0f;
936     }
938     /**
939      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
940      */
941     @Override
942     public long getLength() throws IOException {
943       Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
944       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
945       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
946       long diff = 0;
948       byte[] start = startRow.getBytes();
949       byte[] stop = stopRow.getBytes();
950       for (int i = 0; i < maxCommon; ++i) {
951         diff |= 0xff & (start[i] ^ stop[i]);
952         diff <<= Byte.SIZE;
953       }
955       if (startRow.getLength() != stopRow.getLength())
956         diff |= 0xff;
958       return diff + 1;
959     }
961     @Override
962     public String[] getLocations() throws IOException {
963       return locations;
964     }
966     public void setLocations(String[] locations) {
967       this.locations = locations;
968     }
970     @Override
971     public void readFields(DataInput in) throws IOException {
972       range.readFields(in);
973       int numLocs = in.readInt();
974       locations = new String[numLocs];
975       for (int i = 0; i < numLocs; ++i)
976         locations[i] = in.readUTF();
977     }
979     @Override
980     public void write(DataOutput out) throws IOException {
981       range.write(out);
982       out.writeInt(locations.length);
983       for (int i = 0; i < locations.length; ++i)
984         out.writeUTF(locations[i]);
985     }
986   }
988   // ----------------------------------------------------------------------------------------------------
989   // Everything below this line is deprecated and should go away in future versions
990   // ----------------------------------------------------------------------------------------------------
992   /**
993    * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
994    */
995   @Deprecated
996   public static void setIsolated(Configuration conf, boolean enable) {
997     InputConfigurator.setScanIsolation(CLASS, conf, enable);
998   }
1000   /**
1001    * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
1002    */
1003   @Deprecated
1004   public static void setLocalIterators(Configuration conf, boolean enable) {
1005     InputConfigurator.setLocalIterators(CLASS, conf, enable);
1006   }
1008   /**
1009    * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setInputTableName(Job, String)}, and
1010    *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
1011    */
1012   @Deprecated
1013   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
1014     try {
1015       InputConfigurator.setConnectorInfo(CLASS, conf, user, new PasswordToken().setPassword(passwd));
1016     } catch (AccumuloSecurityException e) {
1017       throw new RuntimeException(e);
1018     }
1019     InputConfigurator.setInputTableName(CLASS, conf, table);
1020     InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
1021   }
1023   /**
1024    * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
1025    */
1026   @Deprecated
1027   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
1028     InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
1029   }
1031   /**
1032    * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
1033    */
1034   @Deprecated
1035   public static void setMockInstance(Configuration conf, String instanceName) {
1036     InputConfigurator.setMockInstance(CLASS, conf, instanceName);
1037   }
1039   /**
1040    * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
1041    */
1042   @Deprecated
1043   public static void setRanges(Configuration conf, Collection<Range> ranges) {
1044     InputConfigurator.setRanges(CLASS, conf, ranges);
1045   }
1047   /**
1048    * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
1049    */
1050   @Deprecated
1051   public static void disableAutoAdjustRanges(Configuration conf) {
1052     InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
1053   }
1055   /**
1056    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1057    */
1058   @Deprecated
1059   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
1060     IteratorSetting vers = new IteratorSetting(1, "vers", VersioningIterator.class);
1061     try {
1062       VersioningIterator.setMaxVersions(vers, maxVersions);
1063     } catch (IllegalArgumentException e) {
1064       throw new IOException(e);
1065     }
1066     InputConfigurator.addIterator(CLASS, conf, vers);
1067   }
1069   /**
1070    * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
1071    */
1072   @Deprecated
1073   public static void setScanOffline(Configuration conf, boolean scanOff) {
1074     InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
1075   }
1077   /**
1078    * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
1079    */
1080   @Deprecated
1081   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
1082     InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
1083   }
1085   /**
1086    * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
1087    */
1088   @Deprecated
1089   public static void setLogLevel(Configuration conf, Level level) {
1090     InputConfigurator.setLogLevel(CLASS, conf, level);
1091   }
1093   /**
1094    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
1095    */
1096   @Deprecated
1097   public static void addIterator(Configuration conf, IteratorSetting cfg) {
1098     InputConfigurator.addIterator(CLASS, conf, cfg);
1099   }
1101   /**
1102    * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
1103    */
1104   @Deprecated
1105   protected static boolean isIsolated(Configuration conf) {
1106     return InputConfigurator.isIsolated(CLASS, conf);
1107   }
1109   /**
1110    * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
1111    */
1112   @Deprecated
1113   protected static boolean usesLocalIterators(Configuration conf) {
1114     return InputConfigurator.usesLocalIterators(CLASS, conf);
1115   }
1117   /**
1118    * @deprecated since 1.5.0; Use {@link #getPrincipal(JobContext)} instead.
1119    */
1120   @Deprecated
1121   protected static String getPrincipal(Configuration conf) {
1122     return InputConfigurator.getPrincipal(CLASS, conf);
1123   }
1125   /**
1126    * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
1127    */
1128   @Deprecated
1129   protected static byte[] getToken(Configuration conf) {
1130     return InputConfigurator.getToken(CLASS, conf);
1131   }
1133   /**
1134    * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
1135    */
1136   @Deprecated
1137   protected static String getTablename(Configuration conf) {
1138     return InputConfigurator.getInputTableName(CLASS, conf);
1139   }
1141   /**
1142    * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
1143    */
1144   @Deprecated
1145   protected static Authorizations getAuthorizations(Configuration conf) {
1146     return InputConfigurator.getScanAuthorizations(CLASS, conf);
1147   }
1149   /**
1150    * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
1151    */
1152   @Deprecated
1153   protected static Instance getInstance(Configuration conf) {
1154     return InputConfigurator.getInstance(CLASS, conf);
1155   }
1157   /**
1158    * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
1159    */
1160   @Deprecated
1161   protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
1162     return InputConfigurator.getTabletLocator(CLASS, conf);
1163   }
1165   /**
1166    * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
1167    */
1168   @Deprecated
1169   protected static List<Range> getRanges(Configuration conf) throws IOException {
1170     return InputConfigurator.getRanges(CLASS, conf);
1171   }
1173   /**
1174    * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
1175    */
1176   @Deprecated
1177   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
1178     return InputConfigurator.getFetchedColumns(CLASS, conf);
1179   }
1181   /**
1182    * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
1183    */
1184   @Deprecated
1185   protected static boolean getAutoAdjustRanges(Configuration conf) {
1186     return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
1187   }
1189   /**
1190    * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
1191    */
1192   @Deprecated
1193   protected static Level getLogLevel(Configuration conf) {
1194     return InputConfigurator.getLogLevel(CLASS, conf);
1195   }
1197   /**
1198    * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
1199    */
1200   @Deprecated
1201   protected static void validateOptions(Configuration conf) throws IOException {
1202     InputConfigurator.validateOptions(CLASS, conf);
1203   }
1205   /**
1206    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1207    */
1208   @Deprecated
1209   protected static int getMaxVersions(Configuration conf) {
1210     // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to
1211     // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary.
1212     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1213     for (IteratorSetting setting : iteratorSettings) {
1214       if ("vers".equals(setting.getName()) && 1 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
1215         if (setting.getOptions().containsKey("maxVersions"))
1216           return Integer.parseInt(setting.getOptions().get("maxVersions"));
1217         else
1218           return -1;
1219       }
1220     }
1221     return -1;
1222   }
1224   /**
1225    * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
1226    */
1227   @Deprecated
1228   protected static boolean isOfflineScan(Configuration conf) {
1229     return InputConfigurator.isOfflineScan(CLASS, conf);
1230   }
1232   /**
1233    * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1234    */
1235   @Deprecated
1236   protected static List<AccumuloIterator> getIterators(Configuration conf) {
1237     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1238     List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
1239     for (IteratorSetting setting : iteratorSettings) {
1240       AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
1241           + AccumuloIterator.FIELD_SEP + setting.getName()));
1242       deprecatedIterators.add(deprecatedIter);
1243     }
1244     return deprecatedIterators;
1245   }
1247   /**
1248    * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1249    */
1250   @Deprecated
1251   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
1252     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1253     List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
1254     for (IteratorSetting setting : iteratorSettings) {
1255       for (Entry<String,String> opt : setting.getOptions().entrySet()) {
1256         String deprecatedOption;
1257         try {
1258           deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
1259               + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
1260         } catch (UnsupportedEncodingException e) {
1261           throw new RuntimeException(e);
1262         }
1263         deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
1264       }
1265     }
1266     return deprecatedIteratorOptions;
1267   }
1269   /**
1270    * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1271    */
1272   @Deprecated
1273   static class AccumuloIterator {
1275     private static final String FIELD_SEP = ":";
1277     private int priority;
1278     private String iteratorClass;
1279     private String iteratorName;
1281     public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
1282       this.priority = priority;
1283       this.iteratorClass = iteratorClass;
1284       this.iteratorName = iteratorName;
1285     }
1287     // Parses out a setting given an string supplied from an earlier toString() call
1288     public AccumuloIterator(String iteratorSetting) {
1289       // Parse the string to expand the iterator
1290       StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
1291       priority = Integer.parseInt(tokenizer.nextToken());
1292       iteratorClass = tokenizer.nextToken();
1293       iteratorName = tokenizer.nextToken();
1294     }
1296     public int getPriority() {
1297       return priority;
1298     }
1300     public String getIteratorClass() {
1301       return iteratorClass;
1302     }
1304     public String getIteratorName() {
1305       return iteratorName;
1306     }
1308     @Override
1309     public String toString() {
1310       return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
1311     }
1313   }
1315   /**
1316    * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1317    */
1318   @Deprecated
1319   static class AccumuloIteratorOption {
1320     private static final String FIELD_SEP = ":";
1322     private String iteratorName;
1323     private String key;
1324     private String value;
1326     public AccumuloIteratorOption(String iteratorName, String key, String value) {
1327       this.iteratorName = iteratorName;
1328       this.key = key;
1329       this.value = value;
1330     }
1332     // Parses out an option given a string supplied from an earlier toString() call
1333     public AccumuloIteratorOption(String iteratorOption) {
1334       StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
1335       this.iteratorName = tokenizer.nextToken();
1336       try {
1337         this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1338         this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1339       } catch (UnsupportedEncodingException e) {
1340         throw new RuntimeException(e);
1341       }
1342     }
1344     public String getIteratorName() {
1345       return iteratorName;
1346     }
1348     public String getKey() {
1349       return key;
1350     }
1352     public String getValue() {
1353       return value;
1354     }
1356     @Override
1357     public String toString() {
1358       try {
1359         return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
1360       } catch (UnsupportedEncodingException e) {
1361         throw new RuntimeException(e);
1362       }
1363     }
1365   }
1367 }