View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.mapreduce;
18  
19  import java.io.DataInput;
20  import java.io.DataOutput;
21  import java.io.IOException;
22  import java.io.UnsupportedEncodingException;
23  import java.math.BigInteger;
24  import java.net.InetAddress;
25  import java.net.URLDecoder;
26  import java.net.URLEncoder;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.Set;
36  import java.util.StringTokenizer;
37  
38  import org.apache.accumulo.core.Constants;
39  import org.apache.accumulo.core.client.AccumuloException;
40  import org.apache.accumulo.core.client.AccumuloSecurityException;
41  import org.apache.accumulo.core.client.ClientSideIteratorScanner;
42  import org.apache.accumulo.core.client.Connector;
43  import org.apache.accumulo.core.client.Instance;
44  import org.apache.accumulo.core.client.IsolatedScanner;
45  import org.apache.accumulo.core.client.IteratorSetting;
46  import org.apache.accumulo.core.client.RowIterator;
47  import org.apache.accumulo.core.client.Scanner;
48  import org.apache.accumulo.core.client.TableDeletedException;
49  import org.apache.accumulo.core.client.TableNotFoundException;
50  import org.apache.accumulo.core.client.TableOfflineException;
51  import org.apache.accumulo.core.client.ZooKeeperInstance;
52  import org.apache.accumulo.core.client.impl.OfflineScanner;
53  import org.apache.accumulo.core.client.impl.Tables;
54  import org.apache.accumulo.core.client.impl.TabletLocator;
55  import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
56  import org.apache.accumulo.core.client.mock.MockInstance;
57  import org.apache.accumulo.core.data.ByteSequence;
58  import org.apache.accumulo.core.data.Key;
59  import org.apache.accumulo.core.data.KeyExtent;
60  import org.apache.accumulo.core.data.PartialKey;
61  import org.apache.accumulo.core.data.Range;
62  import org.apache.accumulo.core.data.Value;
63  import org.apache.accumulo.core.iterators.user.VersioningIterator;
64  import org.apache.accumulo.core.master.state.tables.TableState;
65  import org.apache.accumulo.core.security.Authorizations;
66  import org.apache.accumulo.core.security.CredentialHelper;
67  import org.apache.accumulo.core.security.thrift.Credential;
68  import org.apache.accumulo.core.security.tokens.PasswordToken;
69  import org.apache.accumulo.core.security.tokens.SecurityToken;
70  import org.apache.accumulo.core.util.Pair;
71  import org.apache.accumulo.core.util.UtilWaitThread;
72  import org.apache.hadoop.conf.Configuration;
73  import org.apache.hadoop.filecache.DistributedCache;
74  import org.apache.hadoop.fs.Path;
75  import org.apache.hadoop.io.Text;
76  import org.apache.hadoop.io.Writable;
77  import org.apache.hadoop.mapreduce.InputFormat;
78  import org.apache.hadoop.mapreduce.InputSplit;
79  import org.apache.hadoop.mapreduce.Job;
80  import org.apache.hadoop.mapreduce.JobContext;
81  import org.apache.hadoop.mapreduce.RecordReader;
82  import org.apache.hadoop.mapreduce.TaskAttemptContext;
83  import org.apache.log4j.Level;
84  import org.apache.log4j.Logger;
85  
86  /**
87   * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
88   * <p>
89   * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
90   * <p>
91   * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
92   * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V.
93   * <p>
94   * See {@link AccumuloInputFormat} for an example implementation.
95   */
96  public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
97    
98    private static final Class<?> CLASS = AccumuloInputFormat.class;
99    protected static final Logger log = Logger.getLogger(CLASS);
100   
101   /**
102    * Sets the connector information needed to communicate with Accumulo in this job.
103    * 
104    * <p>
105    * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
106    * conversion to a string, and is not intended to be secure.
107    * 
108    * @param job
109    *          the Hadoop job instance to be configured
110    * @param principal
111    *          a valid Accumulo user name (user must have Table.CREATE permission)
112    * @param token
113    *          the user's password
114    * @throws AccumuloSecurityException 
115    * @since 1.5.0
116    */
117   public static void setConnectorInfo(Job job, String principal, SecurityToken token) throws AccumuloSecurityException {
118     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
119   }
120   
121   /**
122    * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when
123    * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the
124    * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using
125    * {@link TokenHelper#asBase64String(AccumuloToken)}.
126    * 
127    * @param job
128    *          the Hadoop job instance to be configured
129    * @param path
130    *          the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
131    * @since 1.5.0
132    */
133   public static void setConnectorInfo(Job job, Path path) {
134     InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path);
135   }
136   
137   /**
138    * Determines if the connector has been configured.
139    * 
140    * @param context
141    *          the Hadoop context for the configured job
142    * @return true if the connector has been configured, false otherwise
143    * @since 1.5.0
144    * @see #setConnectorInfo(Job, String, byte[])
145    * @see #setConnectorInfo(Job, Path)
146    */
147   protected static Boolean isConnectorInfoSet(JobContext context) {
148     return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
149   }
150   
151   /**
152    * Gets the user name from the configuration.
153    * 
154    * @param context
155    *          the Hadoop context for the configured job
156    * @return the user name
157    * @since 1.5.0
158    * @see #setConnectorInfo(Job, String, SecurityToken)
159    * @see #setConnectorInfo(Job, Path)
160    */
161   protected static String getPrincipal(JobContext context) {
162     return InputConfigurator.getPrincipal(CLASS, context.getConfiguration());
163   }
164   
165   /**
166    * Gets the serialized token class from the configuration.
167    * 
168    * @param context
169    *          the Hadoop context for the configured job
170    * @return the user name
171    * @since 1.5.0
172    * @see #setConnectorInfo(Job, String, SecurityToken)
173    * @see #setConnectorInfo(Job, Path)
174    */
175   protected static String getTokenClass(JobContext context) {
176     return InputConfigurator.getTokenClass(CLASS, context.getConfiguration());
177   }
178   
179   /**
180    * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
181    * provide a charset safe conversion to a string, and is not intended to be secure.
182    * 
183    * @param context
184    *          the Hadoop context for the configured job
185    * @return the decoded user password
186    * @since 1.5.0
187    * @see #setConnectorInfo(Job, String, byte[])
188    */
189   protected static byte[] getToken(JobContext context) {
190     return InputConfigurator.getToken(CLASS, context.getConfiguration());
191   }
192   
193   /**
194    * Configures a {@link ZooKeeperInstance} for this job.
195    * 
196    * @param job
197    *          the Hadoop job instance to be configured
198    * @param instanceName
199    *          the Accumulo instance name
200    * @param zooKeepers
201    *          a comma-separated list of zookeeper servers
202    * @since 1.5.0
203    */
204   public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
205     InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
206   }
207   
208   /**
209    * Configures a {@link MockInstance} for this job.
210    * 
211    * @param job
212    *          the Hadoop job instance to be configured
213    * @param instanceName
214    *          the Accumulo instance name
215    * @since 1.5.0
216    */
217   public static void setMockInstance(Job job, String instanceName) {
218     InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
219   }
220   
221   /**
222    * Initializes an Accumulo {@link Instance} based on the configuration.
223    * 
224    * @param context
225    *          the Hadoop context for the configured job
226    * @return an Accumulo instance
227    * @since 1.5.0
228    * @see #setZooKeeperInstance(Job, String, String)
229    * @see #setMockInstance(Job, String)
230    */
231   protected static Instance getInstance(JobContext context) {
232     return InputConfigurator.getInstance(CLASS, context.getConfiguration());
233   }
234   
235   /**
236    * Sets the log level for this job.
237    * 
238    * @param job
239    *          the Hadoop job instance to be configured
240    * @param level
241    *          the logging level
242    * @since 1.5.0
243    */
244   public static void setLogLevel(Job job, Level level) {
245     InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
246   }
247   
248   /**
249    * Gets the log level from this configuration.
250    * 
251    * @param context
252    *          the Hadoop context for the configured job
253    * @return the log level
254    * @since 1.5.0
255    * @see #setLogLevel(Job, Level)
256    */
257   protected static Level getLogLevel(JobContext context) {
258     return InputConfigurator.getLogLevel(CLASS, context.getConfiguration());
259   }
260   
261   /**
262    * Sets the name of the input table, over which this job will scan.
263    * 
264    * @param job
265    *          the Hadoop job instance to be configured
266    * @param tableName
267    *          the table to use when the tablename is null in the write call
268    * @since 1.5.0
269    */
270   public static void setInputTableName(Job job, String tableName) {
271     InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
272   }
273   
274   /**
275    * Gets the table name from the configuration.
276    * 
277    * @param context
278    *          the Hadoop context for the configured job
279    * @return the table name
280    * @since 1.5.0
281    * @see #setInputTableName(Job, String)
282    */
283   protected static String getInputTableName(JobContext context) {
284     return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
285   }
286   
287   /**
288    * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
289    * 
290    * @param job
291    *          the Hadoop job instance to be configured
292    * @param auths
293    *          the user's authorizations
294    * @since 1.5.0
295    */
296   public static void setScanAuthorizations(Job job, Authorizations auths) {
297     InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
298   }
299   
300   /**
301    * Gets the authorizations to set for the scans from the configuration.
302    * 
303    * @param context
304    *          the Hadoop context for the configured job
305    * @return the Accumulo scan authorizations
306    * @since 1.5.0
307    * @see #setScanAuthorizations(Job, Authorizations)
308    */
309   protected static Authorizations getScanAuthorizations(JobContext context) {
310     return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
311   }
312   
313   /**
314    * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
315    * 
316    * @param job
317    *          the Hadoop job instance to be configured
318    * @param ranges
319    *          the ranges that will be mapped over
320    * @since 1.5.0
321    */
322   public static void setRanges(Job job, Collection<Range> ranges) {
323     InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
324   }
325   
326   /**
327    * Gets the ranges to scan over from a job.
328    * 
329    * @param context
330    *          the Hadoop context for the configured job
331    * @return the ranges
332    * @throws IOException
333    *           if the ranges have been encoded improperly
334    * @since 1.5.0
335    * @see #setRanges(Job, Collection)
336    */
337   protected static List<Range> getRanges(JobContext context) throws IOException {
338     return InputConfigurator.getRanges(CLASS, context.getConfiguration());
339   }
340   
341   /**
342    * Restricts the columns that will be mapped over for this job.
343    * 
344    * @param job
345    *          the Hadoop job instance to be configured
346    * @param columnFamilyColumnQualifierPairs
347    *          a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
348    *          selected. An empty set is the default and is equivalent to scanning the all columns.
349    * @since 1.5.0
350    */
351   public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
352     InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
353   }
354   
355   /**
356    * Gets the columns to be mapped over from this job.
357    * 
358    * @param context
359    *          the Hadoop context for the configured job
360    * @return a set of columns
361    * @since 1.5.0
362    * @see #fetchColumns(Job, Collection)
363    */
364   protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
365     return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
366   }
367   
368   /**
369    * Encode an iterator on the input for this job.
370    * 
371    * @param job
372    *          the Hadoop job instance to be configured
373    * @param cfg
374    *          the configuration of the iterator
375    * @since 1.5.0
376    */
377   public static void addIterator(Job job, IteratorSetting cfg) {
378     InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
379   }
380   
381   /**
382    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
383    * 
384    * @param context
385    *          the Hadoop context for the configured job
386    * @return a list of iterators
387    * @since 1.5.0
388    * @see #addIterator(Job, IteratorSetting)
389    */
390   protected static List<IteratorSetting> getIterators(JobContext context) {
391     return InputConfigurator.getIterators(CLASS, context.getConfiguration());
392   }
393   
394   /**
395    * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
396    * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
397    * 
398    * <p>
399    * By default, this feature is <b>enabled</b>.
400    * 
401    * @param job
402    *          the Hadoop job instance to be configured
403    * @param enableFeature
404    *          the feature is enabled if true, disabled otherwise
405    * @see #setRanges(Job, Collection)
406    * @since 1.5.0
407    */
408   public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
409     InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
410   }
411   
412   /**
413    * Determines whether a configuration has auto-adjust ranges enabled.
414    * 
415    * @param context
416    *          the Hadoop context for the configured job
417    * @return false if the feature is disabled, true otherwise
418    * @since 1.5.0
419    * @see #setAutoAdjustRanges(Job, boolean)
420    */
421   protected static boolean getAutoAdjustRanges(JobContext context) {
422     return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
423   }
424   
425   /**
426    * Controls the use of the {@link IsolatedScanner} in this job.
427    * 
428    * <p>
429    * By default, this feature is <b>disabled</b>.
430    * 
431    * @param job
432    *          the Hadoop job instance to be configured
433    * @param enableFeature
434    *          the feature is enabled if true, disabled otherwise
435    * @since 1.5.0
436    */
437   public static void setScanIsolation(Job job, boolean enableFeature) {
438     InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
439   }
440   
441   /**
442    * Determines whether a configuration has isolation enabled.
443    * 
444    * @param context
445    *          the Hadoop context for the configured job
446    * @return true if the feature is enabled, false otherwise
447    * @since 1.5.0
448    * @see #setScanIsolation(Job, boolean)
449    */
450   protected static boolean isIsolated(JobContext context) {
451     return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
452   }
453   
454   /**
455    * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
456    * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
457    * 
458    * <p>
459    * By default, this feature is <b>disabled</b>.
460    * 
461    * @param job
462    *          the Hadoop job instance to be configured
463    * @param enableFeature
464    *          the feature is enabled if true, disabled otherwise
465    * @since 1.5.0
466    */
467   public static void setLocalIterators(Job job, boolean enableFeature) {
468     InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
469   }
470   
471   /**
472    * Determines whether a configuration uses local iterators.
473    * 
474    * @param context
475    *          the Hadoop context for the configured job
476    * @return true if the feature is enabled, false otherwise
477    * @since 1.5.0
478    * @see #setLocalIterators(Job, boolean)
479    */
480   protected static boolean usesLocalIterators(JobContext context) {
481     return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
482   }
483   
484   /**
485    * <p>
486    * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
487    * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
488    * fail.
489    * 
490    * <p>
491    * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
492    * 
493    * <p>
494    * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
495    * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
496    * 
497    * <p>
498    * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
499    * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
500    * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
501    * 
502    * <p>
503    * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
504    * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
505    * 
506    * <p>
507    * By default, this feature is <b>disabled</b>.
508    * 
509    * @param job
510    *          the Hadoop job instance to be configured
511    * @param enableFeature
512    *          the feature is enabled if true, disabled otherwise
513    * @since 1.5.0
514    */
515   public static void setOfflineTableScan(Job job, boolean enableFeature) {
516     InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
517   }
518   
519   /**
520    * Determines whether a configuration has the offline table scan feature enabled.
521    * 
522    * @param context
523    *          the Hadoop context for the configured job
524    * @return true if the feature is enabled, false otherwise
525    * @since 1.5.0
526    * @see #setOfflineTableScan(Job, boolean)
527    */
528   protected static boolean isOfflineScan(JobContext context) {
529     return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
530   }
531   
532   /**
533    * Initializes an Accumulo {@link TabletLocator} based on the configuration.
534    * 
535    * @param context
536    *          the Hadoop context for the configured job
537    * @return an Accumulo tablet locator
538    * @throws TableNotFoundException
539    *           if the table name set on the configuration doesn't exist
540    * @since 1.5.0
541    */
542   protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
543     return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration());
544   }
545   
546   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job)
547   /**
548    * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
549    * 
550    * @param context
551    *          the Hadoop context for the configured job
552    * @throws IOException
553    *           if the context is improperly configured
554    * @since 1.5.0
555    */
556   protected static void validateOptions(JobContext context) throws IOException {
557     InputConfigurator.validateOptions(CLASS, context.getConfiguration());
558   }
559   
560   /**
561    * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
562    * types.
563    * 
564    * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
565    * <ul>
566    * <li>K {@link #currentK}</li>
567    * <li>V {@link #currentV}</li>
568    * <li>Key {@link #currentKey} (used for progress reporting)</li>
569    * <li>int {@link #numKeysRead} (used for progress reporting)</li>
570    * </ul>
571    */
572   protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
573     protected long numKeysRead;
574     protected Iterator<Entry<Key,Value>> scannerIterator;
575     protected RangeInputSplit split;
576     
577     /**
578      * Apply the configured iterators from the configuration to the scanner.
579      * 
580      * @param context
581      *          the Hadoop context for the configured job
582      * @param scanner
583      *          the scanner to configure
584      */
585     protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
586       List<IteratorSetting> iterators = getIterators(context);
587       for (IteratorSetting iterator : iterators) {
588         scanner.addScanIterator(iterator);
589       }
590     }
591     
592     /**
593      * Initialize a scanner over the given input split using this task attempt configuration.
594      */
595     @Override
596     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
597       Scanner scanner;
598       split = (RangeInputSplit) inSplit;
599       log.debug("Initializing input split: " + split.range);
600       Instance instance = getInstance(attempt);
601       String principal = getPrincipal(attempt);
602       String tokenClass = getTokenClass(attempt);
603       byte[] token = getToken(attempt);
604       Authorizations authorizations = getScanAuthorizations(attempt);
605       
606       try {
607         log.debug("Creating connector with user: " + principal);
608         Connector conn = instance.getConnector(principal, CredentialHelper.extractToken(tokenClass, token));
609         log.debug("Creating scanner for table: " + getInputTableName(attempt));
610         log.debug("Authorizations are: " + authorizations);
611         if (isOfflineScan(attempt)) {
612           scanner = new OfflineScanner(instance, new Credential(principal, tokenClass, ByteBuffer.wrap(token), instance.getInstanceID()), Tables.getTableId(instance,
613               getInputTableName(attempt)), authorizations);
614         } else {
615           scanner = conn.createScanner(getInputTableName(attempt), authorizations);
616         }
617         if (isIsolated(attempt)) {
618           log.info("Creating isolated scanner");
619           scanner = new IsolatedScanner(scanner);
620         }
621         if (usesLocalIterators(attempt)) {
622           log.info("Using local iterators");
623           scanner = new ClientSideIteratorScanner(scanner);
624         }
625         setupIterators(attempt, scanner);
626       } catch (Exception e) {
627         throw new IOException(e);
628       }
629       
630       // setup a scanner within the bounds of this split
631       for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
632         if (c.getSecond() != null) {
633           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
634           scanner.fetchColumn(c.getFirst(), c.getSecond());
635         } else {
636           log.debug("Fetching column family " + c.getFirst());
637           scanner.fetchColumnFamily(c.getFirst());
638         }
639       }
640       
641       scanner.setRange(split.range);
642       
643       numKeysRead = 0;
644       
645       // do this last after setting all scanner options
646       scannerIterator = scanner.iterator();
647     }
648     
649     @Override
650     public void close() {}
651     
652     @Override
653     public float getProgress() throws IOException {
654       if (numKeysRead > 0 && currentKey == null)
655         return 1.0f;
656       return split.getProgress(currentKey);
657     }
658     
659     protected K currentK = null;
660     protected V currentV = null;
661     protected Key currentKey = null;
662     protected Value currentValue = null;
663     
664     @Override
665     public K getCurrentKey() throws IOException, InterruptedException {
666       return currentK;
667     }
668     
669     @Override
670     public V getCurrentValue() throws IOException, InterruptedException {
671       return currentV;
672     }
673   }
674   
675   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
676       AccumuloException, AccumuloSecurityException {
677     
678     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
679     
680     Instance instance = getInstance(context);
681     Connector conn = instance.getConnector(getPrincipal(context), getToken(context));
682     String tableId = Tables.getTableId(instance, tableName);
683     
684     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
685       Tables.clearCache(instance);
686       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
687         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
688       }
689     }
690     
691     for (Range range : ranges) {
692       Text startRow;
693       
694       if (range.getStartKey() != null)
695         startRow = range.getStartKey().getRow();
696       else
697         startRow = new Text();
698       
699       Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
700       Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
701       Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
702       scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
703       scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
704       scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
705       scanner.setRange(metadataRange);
706       
707       RowIterator rowIter = new RowIterator(scanner);
708       
709       // TODO check that extents match prev extent
710       
711       KeyExtent lastExtent = null;
712       
713       while (rowIter.hasNext()) {
714         Iterator<Entry<Key,Value>> row = rowIter.next();
715         String last = "";
716         KeyExtent extent = null;
717         String location = null;
718         
719         while (row.hasNext()) {
720           Entry<Key,Value> entry = row.next();
721           Key key = entry.getKey();
722           
723           if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
724             last = entry.getValue().toString();
725           }
726           
727           if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
728               || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
729             location = entry.getValue().toString();
730           }
731           
732           if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
733             extent = new KeyExtent(key.getRow(), entry.getValue());
734           }
735           
736         }
737         
738         if (location != null)
739           return null;
740         
741         if (!extent.getTableId().toString().equals(tableId)) {
742           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
743         }
744         
745         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
746           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
747         }
748         
749         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
750         if (tabletRanges == null) {
751           tabletRanges = new HashMap<KeyExtent,List<Range>>();
752           binnedRanges.put(last, tabletRanges);
753         }
754         
755         List<Range> rangeList = tabletRanges.get(extent);
756         if (rangeList == null) {
757           rangeList = new ArrayList<Range>();
758           tabletRanges.put(extent, rangeList);
759         }
760         
761         rangeList.add(range);
762         
763         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
764           break;
765         }
766         
767         lastExtent = extent;
768       }
769       
770     }
771     
772     return binnedRanges;
773   }
774   
775   /**
776    * Read the metadata table to get tablets and match up ranges to them.
777    */
778   @Override
779   public List<InputSplit> getSplits(JobContext context) throws IOException {
780     log.setLevel(getLogLevel(context));
781     validateOptions(context);
782     
783     String tableName = getInputTableName(context);
784     boolean autoAdjust = getAutoAdjustRanges(context);
785     List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
786     
787     if (ranges.isEmpty()) {
788       ranges = new ArrayList<Range>(1);
789       ranges.add(new Range());
790     }
791     
792     // get the metadata information for these ranges
793     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
794     TabletLocator tl;
795     try {
796       if (isOfflineScan(context)) {
797         binnedRanges = binOfflineTable(context, tableName, ranges);
798         while (binnedRanges == null) {
799           // Some tablets were still online, try again
800           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
801           binnedRanges = binOfflineTable(context, tableName, ranges);
802         }
803       } else {
804         Instance instance = getInstance(context);
805         String tableId = null;
806         tl = getTabletLocator(context);
807         // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
808         tl.invalidateCache();
809         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
810           if (!(instance instanceof MockInstance)) {
811             if (tableId == null)
812               tableId = Tables.getTableId(instance, tableName);
813             if (!Tables.exists(instance, tableId))
814               throw new TableDeletedException(tableId);
815             if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
816               throw new TableOfflineException(instance, tableId);
817           }
818           binnedRanges.clear();
819           log.warn("Unable to locate bins for specified ranges. Retrying.");
820           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms
821           tl.invalidateCache();
822         }
823       }
824     } catch (Exception e) {
825       throw new IOException(e);
826     }
827     
828     ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
829     HashMap<Range,ArrayList<String>> splitsToAdd = null;
830     
831     if (!autoAdjust)
832       splitsToAdd = new HashMap<Range,ArrayList<String>>();
833     
834     HashMap<String,String> hostNameCache = new HashMap<String,String>();
835     
836     for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
837       String ip = tserverBin.getKey().split(":", 2)[0];
838       String location = hostNameCache.get(ip);
839       if (location == null) {
840         InetAddress inetAddress = InetAddress.getByName(ip);
841         location = inetAddress.getHostName();
842         hostNameCache.put(ip, location);
843       }
844       
845       for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
846         Range ke = extentRanges.getKey().toDataRange();
847         for (Range r : extentRanges.getValue()) {
848           if (autoAdjust) {
849             // divide ranges into smaller ranges, based on the tablets
850             splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
851           } else {
852             // don't divide ranges
853             ArrayList<String> locations = splitsToAdd.get(r);
854             if (locations == null)
855               locations = new ArrayList<String>(1);
856             locations.add(location);
857             splitsToAdd.put(r, locations);
858           }
859         }
860       }
861     }
862     
863     if (!autoAdjust)
864       for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
865         splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
866     return splits;
867   }
868   
869   /**
870    * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
871    */
872   public static class RangeInputSplit extends InputSplit implements Writable {
873     private Range range;
874     private String[] locations;
875     
876     public RangeInputSplit() {
877       range = new Range();
878       locations = new String[0];
879     }
880     
881     public RangeInputSplit(RangeInputSplit split) throws IOException {
882       this.setRange(split.getRange());
883       this.setLocations(split.getLocations());
884     }
885     
886     protected RangeInputSplit(String table, Range range, String[] locations) {
887       this.range = range;
888       this.locations = locations;
889     }
890     
891     public Range getRange() {
892       return range;
893     }
894     
895     public void setRange(Range range) {
896       this.range = range;
897     }
898     
899     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
900       byte[] bytes = new byte[numBytes + 1];
901       bytes[0] = 0;
902       for (int i = 0; i < numBytes; i++) {
903         if (i >= seq.length())
904           bytes[i + 1] = 0;
905         else
906           bytes[i + 1] = seq.byteAt(i);
907       }
908       return bytes;
909     }
910     
911     public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
912       int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
913       BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
914       BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
915       BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
916       return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
917     }
918     
919     public float getProgress(Key currentKey) {
920       if (currentKey == null)
921         return 0f;
922       if (range.getStartKey() != null && range.getEndKey() != null) {
923         if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW)) {
924           // just look at the row progress
925           return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
926         } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM)) {
927           // just look at the column family progress
928           return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
929         } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
930           // just look at the column qualifier progress
931           return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
932         }
933       }
934       // if we can't figure it out, then claim no progress
935       return 0f;
936     }
937     
938     /**
939      * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
940      */
941     @Override
942     public long getLength() throws IOException {
943       Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
944       Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
945       int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
946       long diff = 0;
947       
948       byte[] start = startRow.getBytes();
949       byte[] stop = stopRow.getBytes();
950       for (int i = 0; i < maxCommon; ++i) {
951         diff |= 0xff & (start[i] ^ stop[i]);
952         diff <<= Byte.SIZE;
953       }
954       
955       if (startRow.getLength() != stopRow.getLength())
956         diff |= 0xff;
957       
958       return diff + 1;
959     }
960     
961     @Override
962     public String[] getLocations() throws IOException {
963       return locations;
964     }
965     
966     public void setLocations(String[] locations) {
967       this.locations = locations;
968     }
969     
970     @Override
971     public void readFields(DataInput in) throws IOException {
972       range.readFields(in);
973       int numLocs = in.readInt();
974       locations = new String[numLocs];
975       for (int i = 0; i < numLocs; ++i)
976         locations[i] = in.readUTF();
977     }
978     
979     @Override
980     public void write(DataOutput out) throws IOException {
981       range.write(out);
982       out.writeInt(locations.length);
983       for (int i = 0; i < locations.length; ++i)
984         out.writeUTF(locations[i]);
985     }
986   }
987   
988   // ----------------------------------------------------------------------------------------------------
989   // Everything below this line is deprecated and should go away in future versions
990   // ----------------------------------------------------------------------------------------------------
991   
992   /**
993    * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
994    */
995   @Deprecated
996   public static void setIsolated(Configuration conf, boolean enable) {
997     InputConfigurator.setScanIsolation(CLASS, conf, enable);
998   }
999   
1000   /**
1001    * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
1002    */
1003   @Deprecated
1004   public static void setLocalIterators(Configuration conf, boolean enable) {
1005     InputConfigurator.setLocalIterators(CLASS, conf, enable);
1006   }
1007   
1008   /**
1009    * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setInputTableName(Job, String)}, and
1010    *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
1011    */
1012   @Deprecated
1013   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
1014     try {
1015       InputConfigurator.setConnectorInfo(CLASS, conf, user, new PasswordToken().setPassword(passwd));
1016     } catch (AccumuloSecurityException e) {
1017       throw new RuntimeException(e);
1018     }
1019     InputConfigurator.setInputTableName(CLASS, conf, table);
1020     InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
1021   }
1022   
1023   /**
1024    * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
1025    */
1026   @Deprecated
1027   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
1028     InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
1029   }
1030   
1031   /**
1032    * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
1033    */
1034   @Deprecated
1035   public static void setMockInstance(Configuration conf, String instanceName) {
1036     InputConfigurator.setMockInstance(CLASS, conf, instanceName);
1037   }
1038   
1039   /**
1040    * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
1041    */
1042   @Deprecated
1043   public static void setRanges(Configuration conf, Collection<Range> ranges) {
1044     InputConfigurator.setRanges(CLASS, conf, ranges);
1045   }
1046   
1047   /**
1048    * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
1049    */
1050   @Deprecated
1051   public static void disableAutoAdjustRanges(Configuration conf) {
1052     InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
1053   }
1054   
1055   /**
1056    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1057    */
1058   @Deprecated
1059   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
1060     IteratorSetting vers = new IteratorSetting(1, "vers", VersioningIterator.class);
1061     try {
1062       VersioningIterator.setMaxVersions(vers, maxVersions);
1063     } catch (IllegalArgumentException e) {
1064       throw new IOException(e);
1065     }
1066     InputConfigurator.addIterator(CLASS, conf, vers);
1067   }
1068   
1069   /**
1070    * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
1071    */
1072   @Deprecated
1073   public static void setScanOffline(Configuration conf, boolean scanOff) {
1074     InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
1075   }
1076   
1077   /**
1078    * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
1079    */
1080   @Deprecated
1081   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
1082     InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
1083   }
1084   
1085   /**
1086    * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
1087    */
1088   @Deprecated
1089   public static void setLogLevel(Configuration conf, Level level) {
1090     InputConfigurator.setLogLevel(CLASS, conf, level);
1091   }
1092   
1093   /**
1094    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
1095    */
1096   @Deprecated
1097   public static void addIterator(Configuration conf, IteratorSetting cfg) {
1098     InputConfigurator.addIterator(CLASS, conf, cfg);
1099   }
1100   
1101   /**
1102    * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
1103    */
1104   @Deprecated
1105   protected static boolean isIsolated(Configuration conf) {
1106     return InputConfigurator.isIsolated(CLASS, conf);
1107   }
1108   
1109   /**
1110    * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
1111    */
1112   @Deprecated
1113   protected static boolean usesLocalIterators(Configuration conf) {
1114     return InputConfigurator.usesLocalIterators(CLASS, conf);
1115   }
1116   
1117   /**
1118    * @deprecated since 1.5.0; Use {@link #getPrincipal(JobContext)} instead.
1119    */
1120   @Deprecated
1121   protected static String getPrincipal(Configuration conf) {
1122     return InputConfigurator.getPrincipal(CLASS, conf);
1123   }
1124   
1125   /**
1126    * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
1127    */
1128   @Deprecated
1129   protected static byte[] getToken(Configuration conf) {
1130     return InputConfigurator.getToken(CLASS, conf);
1131   }
1132   
1133   /**
1134    * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
1135    */
1136   @Deprecated
1137   protected static String getTablename(Configuration conf) {
1138     return InputConfigurator.getInputTableName(CLASS, conf);
1139   }
1140   
1141   /**
1142    * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
1143    */
1144   @Deprecated
1145   protected static Authorizations getAuthorizations(Configuration conf) {
1146     return InputConfigurator.getScanAuthorizations(CLASS, conf);
1147   }
1148   
1149   /**
1150    * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
1151    */
1152   @Deprecated
1153   protected static Instance getInstance(Configuration conf) {
1154     return InputConfigurator.getInstance(CLASS, conf);
1155   }
1156   
1157   /**
1158    * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
1159    */
1160   @Deprecated
1161   protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
1162     return InputConfigurator.getTabletLocator(CLASS, conf);
1163   }
1164   
1165   /**
1166    * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
1167    */
1168   @Deprecated
1169   protected static List<Range> getRanges(Configuration conf) throws IOException {
1170     return InputConfigurator.getRanges(CLASS, conf);
1171   }
1172   
1173   /**
1174    * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
1175    */
1176   @Deprecated
1177   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
1178     return InputConfigurator.getFetchedColumns(CLASS, conf);
1179   }
1180   
1181   /**
1182    * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
1183    */
1184   @Deprecated
1185   protected static boolean getAutoAdjustRanges(Configuration conf) {
1186     return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
1187   }
1188   
1189   /**
1190    * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
1191    */
1192   @Deprecated
1193   protected static Level getLogLevel(Configuration conf) {
1194     return InputConfigurator.getLogLevel(CLASS, conf);
1195   }
1196   
1197   /**
1198    * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
1199    */
1200   @Deprecated
1201   protected static void validateOptions(Configuration conf) throws IOException {
1202     InputConfigurator.validateOptions(CLASS, conf);
1203   }
1204   
1205   /**
1206    * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1207    */
1208   @Deprecated
1209   protected static int getMaxVersions(Configuration conf) {
1210     // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to
1211     // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary.
1212     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1213     for (IteratorSetting setting : iteratorSettings) {
1214       if ("vers".equals(setting.getName()) && 1 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
1215         if (setting.getOptions().containsKey("maxVersions"))
1216           return Integer.parseInt(setting.getOptions().get("maxVersions"));
1217         else
1218           return -1;
1219       }
1220     }
1221     return -1;
1222   }
1223   
1224   /**
1225    * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
1226    */
1227   @Deprecated
1228   protected static boolean isOfflineScan(Configuration conf) {
1229     return InputConfigurator.isOfflineScan(CLASS, conf);
1230   }
1231   
1232   /**
1233    * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1234    */
1235   @Deprecated
1236   protected static List<AccumuloIterator> getIterators(Configuration conf) {
1237     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1238     List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
1239     for (IteratorSetting setting : iteratorSettings) {
1240       AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
1241           + AccumuloIterator.FIELD_SEP + setting.getName()));
1242       deprecatedIterators.add(deprecatedIter);
1243     }
1244     return deprecatedIterators;
1245   }
1246   
1247   /**
1248    * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1249    */
1250   @Deprecated
1251   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
1252     List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1253     List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
1254     for (IteratorSetting setting : iteratorSettings) {
1255       for (Entry<String,String> opt : setting.getOptions().entrySet()) {
1256         String deprecatedOption;
1257         try {
1258           deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
1259               + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
1260         } catch (UnsupportedEncodingException e) {
1261           throw new RuntimeException(e);
1262         }
1263         deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
1264       }
1265     }
1266     return deprecatedIteratorOptions;
1267   }
1268   
1269   /**
1270    * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1271    */
1272   @Deprecated
1273   static class AccumuloIterator {
1274     
1275     private static final String FIELD_SEP = ":";
1276     
1277     private int priority;
1278     private String iteratorClass;
1279     private String iteratorName;
1280     
1281     public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
1282       this.priority = priority;
1283       this.iteratorClass = iteratorClass;
1284       this.iteratorName = iteratorName;
1285     }
1286     
1287     // Parses out a setting given an string supplied from an earlier toString() call
1288     public AccumuloIterator(String iteratorSetting) {
1289       // Parse the string to expand the iterator
1290       StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
1291       priority = Integer.parseInt(tokenizer.nextToken());
1292       iteratorClass = tokenizer.nextToken();
1293       iteratorName = tokenizer.nextToken();
1294     }
1295     
1296     public int getPriority() {
1297       return priority;
1298     }
1299     
1300     public String getIteratorClass() {
1301       return iteratorClass;
1302     }
1303     
1304     public String getIteratorName() {
1305       return iteratorName;
1306     }
1307     
1308     @Override
1309     public String toString() {
1310       return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
1311     }
1312     
1313   }
1314   
1315   /**
1316    * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1317    */
1318   @Deprecated
1319   static class AccumuloIteratorOption {
1320     private static final String FIELD_SEP = ":";
1321     
1322     private String iteratorName;
1323     private String key;
1324     private String value;
1325     
1326     public AccumuloIteratorOption(String iteratorName, String key, String value) {
1327       this.iteratorName = iteratorName;
1328       this.key = key;
1329       this.value = value;
1330     }
1331     
1332     // Parses out an option given a string supplied from an earlier toString() call
1333     public AccumuloIteratorOption(String iteratorOption) {
1334       StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
1335       this.iteratorName = tokenizer.nextToken();
1336       try {
1337         this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1338         this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1339       } catch (UnsupportedEncodingException e) {
1340         throw new RuntimeException(e);
1341       }
1342     }
1343     
1344     public String getIteratorName() {
1345       return iteratorName;
1346     }
1347     
1348     public String getKey() {
1349       return key;
1350     }
1351     
1352     public String getValue() {
1353       return value;
1354     }
1355     
1356     @Override
1357     public String toString() {
1358       try {
1359         return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
1360       } catch (UnsupportedEncodingException e) {
1361         throw new RuntimeException(e);
1362       }
1363     }
1364     
1365   }
1366   
1367 }