1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.mapreduce;
18
19 import java.io.DataInput;
20 import java.io.DataOutput;
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.math.BigInteger;
24 import java.net.InetAddress;
25 import java.net.URLDecoder;
26 import java.net.URLEncoder;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.StringTokenizer;
37
38 import org.apache.accumulo.core.Constants;
39 import org.apache.accumulo.core.client.AccumuloException;
40 import org.apache.accumulo.core.client.AccumuloSecurityException;
41 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
42 import org.apache.accumulo.core.client.Connector;
43 import org.apache.accumulo.core.client.Instance;
44 import org.apache.accumulo.core.client.IsolatedScanner;
45 import org.apache.accumulo.core.client.IteratorSetting;
46 import org.apache.accumulo.core.client.RowIterator;
47 import org.apache.accumulo.core.client.Scanner;
48 import org.apache.accumulo.core.client.TableDeletedException;
49 import org.apache.accumulo.core.client.TableNotFoundException;
50 import org.apache.accumulo.core.client.TableOfflineException;
51 import org.apache.accumulo.core.client.ZooKeeperInstance;
52 import org.apache.accumulo.core.client.impl.OfflineScanner;
53 import org.apache.accumulo.core.client.impl.Tables;
54 import org.apache.accumulo.core.client.impl.TabletLocator;
55 import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
56 import org.apache.accumulo.core.client.mock.MockInstance;
57 import org.apache.accumulo.core.data.ByteSequence;
58 import org.apache.accumulo.core.data.Key;
59 import org.apache.accumulo.core.data.KeyExtent;
60 import org.apache.accumulo.core.data.PartialKey;
61 import org.apache.accumulo.core.data.Range;
62 import org.apache.accumulo.core.data.Value;
63 import org.apache.accumulo.core.iterators.user.VersioningIterator;
64 import org.apache.accumulo.core.master.state.tables.TableState;
65 import org.apache.accumulo.core.security.Authorizations;
66 import org.apache.accumulo.core.security.CredentialHelper;
67 import org.apache.accumulo.core.security.thrift.Credential;
68 import org.apache.accumulo.core.security.tokens.PasswordToken;
69 import org.apache.accumulo.core.security.tokens.SecurityToken;
70 import org.apache.accumulo.core.util.Pair;
71 import org.apache.accumulo.core.util.UtilWaitThread;
72 import org.apache.hadoop.conf.Configuration;
73 import org.apache.hadoop.filecache.DistributedCache;
74 import org.apache.hadoop.fs.Path;
75 import org.apache.hadoop.io.Text;
76 import org.apache.hadoop.io.Writable;
77 import org.apache.hadoop.mapreduce.InputFormat;
78 import org.apache.hadoop.mapreduce.InputSplit;
79 import org.apache.hadoop.mapreduce.Job;
80 import org.apache.hadoop.mapreduce.JobContext;
81 import org.apache.hadoop.mapreduce.RecordReader;
82 import org.apache.hadoop.mapreduce.TaskAttemptContext;
83 import org.apache.log4j.Level;
84 import org.apache.log4j.Logger;
85
86 /**
87 * This abstract {@link InputFormat} class allows MapReduce jobs to use Accumulo as the source of K,V pairs.
88 * <p>
89 * Subclasses must implement a {@link #createRecordReader(InputSplit, TaskAttemptContext)} to provide a {@link RecordReader} for K,V.
90 * <p>
91 * A static base class, RecordReaderBase, is provided to retrieve Accumulo {@link Key}/{@link Value} pairs, but one must implement its
92 * {@link RecordReaderBase#nextKeyValue()} to transform them to the desired generic types K,V.
93 * <p>
94 * See {@link AccumuloInputFormat} for an example implementation.
95 */
96 public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
97
98 private static final Class<?> CLASS = AccumuloInputFormat.class;
99 protected static final Logger log = Logger.getLogger(CLASS);
100
101 /**
102 * Sets the connector information needed to communicate with Accumulo in this job.
103 *
104 * <p>
105 * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
106 * conversion to a string, and is not intended to be secure.
107 *
108 * @param job
109 * the Hadoop job instance to be configured
110 * @param principal
111 * a valid Accumulo user name (user must have Table.CREATE permission)
112 * @param token
113 * the user's password
114 * @throws AccumuloSecurityException
115 * @since 1.5.0
116 */
117 public static void setConnectorInfo(Job job, String principal, SecurityToken token) throws AccumuloSecurityException {
118 InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
119 }
120
121 /**
122 * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when
123 * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the
124 * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using
125 * {@link TokenHelper#asBase64String(AccumuloToken)}.
126 *
127 * @param job
128 * the Hadoop job instance to be configured
129 * @param path
130 * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
131 * @since 1.5.0
132 */
133 public static void setConnectorInfo(Job job, Path path) {
134 InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path);
135 }
136
137 /**
138 * Determines if the connector has been configured.
139 *
140 * @param context
141 * the Hadoop context for the configured job
142 * @return true if the connector has been configured, false otherwise
143 * @since 1.5.0
144 * @see #setConnectorInfo(Job, String, byte[])
145 * @see #setConnectorInfo(Job, Path)
146 */
147 protected static Boolean isConnectorInfoSet(JobContext context) {
148 return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration());
149 }
150
151 /**
152 * Gets the user name from the configuration.
153 *
154 * @param context
155 * the Hadoop context for the configured job
156 * @return the user name
157 * @since 1.5.0
158 * @see #setConnectorInfo(Job, String, SecurityToken)
159 * @see #setConnectorInfo(Job, Path)
160 */
161 protected static String getPrincipal(JobContext context) {
162 return InputConfigurator.getPrincipal(CLASS, context.getConfiguration());
163 }
164
165 /**
166 * Gets the serialized token class from the configuration.
167 *
168 * @param context
169 * the Hadoop context for the configured job
170 * @return the user name
171 * @since 1.5.0
172 * @see #setConnectorInfo(Job, String, SecurityToken)
173 * @see #setConnectorInfo(Job, Path)
174 */
175 protected static String getTokenClass(JobContext context) {
176 return InputConfigurator.getTokenClass(CLASS, context.getConfiguration());
177 }
178
179 /**
180 * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
181 * provide a charset safe conversion to a string, and is not intended to be secure.
182 *
183 * @param context
184 * the Hadoop context for the configured job
185 * @return the decoded user password
186 * @since 1.5.0
187 * @see #setConnectorInfo(Job, String, byte[])
188 */
189 protected static byte[] getToken(JobContext context) {
190 return InputConfigurator.getToken(CLASS, context.getConfiguration());
191 }
192
193 /**
194 * Configures a {@link ZooKeeperInstance} for this job.
195 *
196 * @param job
197 * the Hadoop job instance to be configured
198 * @param instanceName
199 * the Accumulo instance name
200 * @param zooKeepers
201 * a comma-separated list of zookeeper servers
202 * @since 1.5.0
203 */
204 public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) {
205 InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers);
206 }
207
208 /**
209 * Configures a {@link MockInstance} for this job.
210 *
211 * @param job
212 * the Hadoop job instance to be configured
213 * @param instanceName
214 * the Accumulo instance name
215 * @since 1.5.0
216 */
217 public static void setMockInstance(Job job, String instanceName) {
218 InputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName);
219 }
220
221 /**
222 * Initializes an Accumulo {@link Instance} based on the configuration.
223 *
224 * @param context
225 * the Hadoop context for the configured job
226 * @return an Accumulo instance
227 * @since 1.5.0
228 * @see #setZooKeeperInstance(Job, String, String)
229 * @see #setMockInstance(Job, String)
230 */
231 protected static Instance getInstance(JobContext context) {
232 return InputConfigurator.getInstance(CLASS, context.getConfiguration());
233 }
234
235 /**
236 * Sets the log level for this job.
237 *
238 * @param job
239 * the Hadoop job instance to be configured
240 * @param level
241 * the logging level
242 * @since 1.5.0
243 */
244 public static void setLogLevel(Job job, Level level) {
245 InputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
246 }
247
248 /**
249 * Gets the log level from this configuration.
250 *
251 * @param context
252 * the Hadoop context for the configured job
253 * @return the log level
254 * @since 1.5.0
255 * @see #setLogLevel(Job, Level)
256 */
257 protected static Level getLogLevel(JobContext context) {
258 return InputConfigurator.getLogLevel(CLASS, context.getConfiguration());
259 }
260
261 /**
262 * Sets the name of the input table, over which this job will scan.
263 *
264 * @param job
265 * the Hadoop job instance to be configured
266 * @param tableName
267 * the table to use when the tablename is null in the write call
268 * @since 1.5.0
269 */
270 public static void setInputTableName(Job job, String tableName) {
271 InputConfigurator.setInputTableName(CLASS, job.getConfiguration(), tableName);
272 }
273
274 /**
275 * Gets the table name from the configuration.
276 *
277 * @param context
278 * the Hadoop context for the configured job
279 * @return the table name
280 * @since 1.5.0
281 * @see #setInputTableName(Job, String)
282 */
283 protected static String getInputTableName(JobContext context) {
284 return InputConfigurator.getInputTableName(CLASS, context.getConfiguration());
285 }
286
287 /**
288 * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set.
289 *
290 * @param job
291 * the Hadoop job instance to be configured
292 * @param auths
293 * the user's authorizations
294 * @since 1.5.0
295 */
296 public static void setScanAuthorizations(Job job, Authorizations auths) {
297 InputConfigurator.setScanAuthorizations(CLASS, job.getConfiguration(), auths);
298 }
299
300 /**
301 * Gets the authorizations to set for the scans from the configuration.
302 *
303 * @param context
304 * the Hadoop context for the configured job
305 * @return the Accumulo scan authorizations
306 * @since 1.5.0
307 * @see #setScanAuthorizations(Job, Authorizations)
308 */
309 protected static Authorizations getScanAuthorizations(JobContext context) {
310 return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration());
311 }
312
313 /**
314 * Sets the input ranges to scan for this job. If not set, the entire table will be scanned.
315 *
316 * @param job
317 * the Hadoop job instance to be configured
318 * @param ranges
319 * the ranges that will be mapped over
320 * @since 1.5.0
321 */
322 public static void setRanges(Job job, Collection<Range> ranges) {
323 InputConfigurator.setRanges(CLASS, job.getConfiguration(), ranges);
324 }
325
326 /**
327 * Gets the ranges to scan over from a job.
328 *
329 * @param context
330 * the Hadoop context for the configured job
331 * @return the ranges
332 * @throws IOException
333 * if the ranges have been encoded improperly
334 * @since 1.5.0
335 * @see #setRanges(Job, Collection)
336 */
337 protected static List<Range> getRanges(JobContext context) throws IOException {
338 return InputConfigurator.getRanges(CLASS, context.getConfiguration());
339 }
340
341 /**
342 * Restricts the columns that will be mapped over for this job.
343 *
344 * @param job
345 * the Hadoop job instance to be configured
346 * @param columnFamilyColumnQualifierPairs
347 * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
348 * selected. An empty set is the default and is equivalent to scanning the all columns.
349 * @since 1.5.0
350 */
351 public static void fetchColumns(Job job, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
352 InputConfigurator.fetchColumns(CLASS, job.getConfiguration(), columnFamilyColumnQualifierPairs);
353 }
354
355 /**
356 * Gets the columns to be mapped over from this job.
357 *
358 * @param context
359 * the Hadoop context for the configured job
360 * @return a set of columns
361 * @since 1.5.0
362 * @see #fetchColumns(Job, Collection)
363 */
364 protected static Set<Pair<Text,Text>> getFetchedColumns(JobContext context) {
365 return InputConfigurator.getFetchedColumns(CLASS, context.getConfiguration());
366 }
367
368 /**
369 * Encode an iterator on the input for this job.
370 *
371 * @param job
372 * the Hadoop job instance to be configured
373 * @param cfg
374 * the configuration of the iterator
375 * @since 1.5.0
376 */
377 public static void addIterator(Job job, IteratorSetting cfg) {
378 InputConfigurator.addIterator(CLASS, job.getConfiguration(), cfg);
379 }
380
381 /**
382 * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration.
383 *
384 * @param context
385 * the Hadoop context for the configured job
386 * @return a list of iterators
387 * @since 1.5.0
388 * @see #addIterator(Job, IteratorSetting)
389 */
390 protected static List<IteratorSetting> getIterators(JobContext context) {
391 return InputConfigurator.getIterators(CLASS, context.getConfiguration());
392 }
393
394 /**
395 * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
396 * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
397 *
398 * <p>
399 * By default, this feature is <b>enabled</b>.
400 *
401 * @param job
402 * the Hadoop job instance to be configured
403 * @param enableFeature
404 * the feature is enabled if true, disabled otherwise
405 * @see #setRanges(Job, Collection)
406 * @since 1.5.0
407 */
408 public static void setAutoAdjustRanges(Job job, boolean enableFeature) {
409 InputConfigurator.setAutoAdjustRanges(CLASS, job.getConfiguration(), enableFeature);
410 }
411
412 /**
413 * Determines whether a configuration has auto-adjust ranges enabled.
414 *
415 * @param context
416 * the Hadoop context for the configured job
417 * @return false if the feature is disabled, true otherwise
418 * @since 1.5.0
419 * @see #setAutoAdjustRanges(Job, boolean)
420 */
421 protected static boolean getAutoAdjustRanges(JobContext context) {
422 return InputConfigurator.getAutoAdjustRanges(CLASS, context.getConfiguration());
423 }
424
425 /**
426 * Controls the use of the {@link IsolatedScanner} in this job.
427 *
428 * <p>
429 * By default, this feature is <b>disabled</b>.
430 *
431 * @param job
432 * the Hadoop job instance to be configured
433 * @param enableFeature
434 * the feature is enabled if true, disabled otherwise
435 * @since 1.5.0
436 */
437 public static void setScanIsolation(Job job, boolean enableFeature) {
438 InputConfigurator.setScanIsolation(CLASS, job.getConfiguration(), enableFeature);
439 }
440
441 /**
442 * Determines whether a configuration has isolation enabled.
443 *
444 * @param context
445 * the Hadoop context for the configured job
446 * @return true if the feature is enabled, false otherwise
447 * @since 1.5.0
448 * @see #setScanIsolation(Job, boolean)
449 */
450 protected static boolean isIsolated(JobContext context) {
451 return InputConfigurator.isIsolated(CLASS, context.getConfiguration());
452 }
453
454 /**
455 * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map
456 * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task.
457 *
458 * <p>
459 * By default, this feature is <b>disabled</b>.
460 *
461 * @param job
462 * the Hadoop job instance to be configured
463 * @param enableFeature
464 * the feature is enabled if true, disabled otherwise
465 * @since 1.5.0
466 */
467 public static void setLocalIterators(Job job, boolean enableFeature) {
468 InputConfigurator.setLocalIterators(CLASS, job.getConfiguration(), enableFeature);
469 }
470
471 /**
472 * Determines whether a configuration uses local iterators.
473 *
474 * @param context
475 * the Hadoop context for the configured job
476 * @return true if the feature is enabled, false otherwise
477 * @since 1.5.0
478 * @see #setLocalIterators(Job, boolean)
479 */
480 protected static boolean usesLocalIterators(JobContext context) {
481 return InputConfigurator.usesLocalIterators(CLASS, context.getConfiguration());
482 }
483
484 /**
485 * <p>
486 * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
487 * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
488 * fail.
489 *
490 * <p>
491 * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
492 *
493 * <p>
494 * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
495 * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
496 *
497 * <p>
498 * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
499 * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
500 * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
501 *
502 * <p>
503 * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
504 * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
505 *
506 * <p>
507 * By default, this feature is <b>disabled</b>.
508 *
509 * @param job
510 * the Hadoop job instance to be configured
511 * @param enableFeature
512 * the feature is enabled if true, disabled otherwise
513 * @since 1.5.0
514 */
515 public static void setOfflineTableScan(Job job, boolean enableFeature) {
516 InputConfigurator.setOfflineTableScan(CLASS, job.getConfiguration(), enableFeature);
517 }
518
519 /**
520 * Determines whether a configuration has the offline table scan feature enabled.
521 *
522 * @param context
523 * the Hadoop context for the configured job
524 * @return true if the feature is enabled, false otherwise
525 * @since 1.5.0
526 * @see #setOfflineTableScan(Job, boolean)
527 */
528 protected static boolean isOfflineScan(JobContext context) {
529 return InputConfigurator.isOfflineScan(CLASS, context.getConfiguration());
530 }
531
532 /**
533 * Initializes an Accumulo {@link TabletLocator} based on the configuration.
534 *
535 * @param context
536 * the Hadoop context for the configured job
537 * @return an Accumulo tablet locator
538 * @throws TableNotFoundException
539 * if the table name set on the configuration doesn't exist
540 * @since 1.5.0
541 */
542 protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException {
543 return InputConfigurator.getTabletLocator(CLASS, context.getConfiguration());
544 }
545
546
547 /**
548 * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
549 *
550 * @param context
551 * the Hadoop context for the configured job
552 * @throws IOException
553 * if the context is improperly configured
554 * @since 1.5.0
555 */
556 protected static void validateOptions(JobContext context) throws IOException {
557 InputConfigurator.validateOptions(CLASS, context.getConfiguration());
558 }
559
560 /**
561 * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V
562 * types.
563 *
564 * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables:
565 * <ul>
566 * <li>K {@link #currentK}</li>
567 * <li>V {@link #currentV}</li>
568 * <li>Key {@link #currentKey} (used for progress reporting)</li>
569 * <li>int {@link #numKeysRead} (used for progress reporting)</li>
570 * </ul>
571 */
572 protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> {
573 protected long numKeysRead;
574 protected Iterator<Entry<Key,Value>> scannerIterator;
575 protected RangeInputSplit split;
576
577 /**
578 * Apply the configured iterators from the configuration to the scanner.
579 *
580 * @param context
581 * the Hadoop context for the configured job
582 * @param scanner
583 * the scanner to configure
584 */
585 protected void setupIterators(TaskAttemptContext context, Scanner scanner) {
586 List<IteratorSetting> iterators = getIterators(context);
587 for (IteratorSetting iterator : iterators) {
588 scanner.addScanIterator(iterator);
589 }
590 }
591
592 /**
593 * Initialize a scanner over the given input split using this task attempt configuration.
594 */
595 @Override
596 public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException {
597 Scanner scanner;
598 split = (RangeInputSplit) inSplit;
599 log.debug("Initializing input split: " + split.range);
600 Instance instance = getInstance(attempt);
601 String principal = getPrincipal(attempt);
602 String tokenClass = getTokenClass(attempt);
603 byte[] token = getToken(attempt);
604 Authorizations authorizations = getScanAuthorizations(attempt);
605
606 try {
607 log.debug("Creating connector with user: " + principal);
608 Connector conn = instance.getConnector(principal, CredentialHelper.extractToken(tokenClass, token));
609 log.debug("Creating scanner for table: " + getInputTableName(attempt));
610 log.debug("Authorizations are: " + authorizations);
611 if (isOfflineScan(attempt)) {
612 scanner = new OfflineScanner(instance, new Credential(principal, tokenClass, ByteBuffer.wrap(token), instance.getInstanceID()), Tables.getTableId(instance,
613 getInputTableName(attempt)), authorizations);
614 } else {
615 scanner = conn.createScanner(getInputTableName(attempt), authorizations);
616 }
617 if (isIsolated(attempt)) {
618 log.info("Creating isolated scanner");
619 scanner = new IsolatedScanner(scanner);
620 }
621 if (usesLocalIterators(attempt)) {
622 log.info("Using local iterators");
623 scanner = new ClientSideIteratorScanner(scanner);
624 }
625 setupIterators(attempt, scanner);
626 } catch (Exception e) {
627 throw new IOException(e);
628 }
629
630
631 for (Pair<Text,Text> c : getFetchedColumns(attempt)) {
632 if (c.getSecond() != null) {
633 log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
634 scanner.fetchColumn(c.getFirst(), c.getSecond());
635 } else {
636 log.debug("Fetching column family " + c.getFirst());
637 scanner.fetchColumnFamily(c.getFirst());
638 }
639 }
640
641 scanner.setRange(split.range);
642
643 numKeysRead = 0;
644
645
646 scannerIterator = scanner.iterator();
647 }
648
649 @Override
650 public void close() {}
651
652 @Override
653 public float getProgress() throws IOException {
654 if (numKeysRead > 0 && currentKey == null)
655 return 1.0f;
656 return split.getProgress(currentKey);
657 }
658
659 protected K currentK = null;
660 protected V currentV = null;
661 protected Key currentKey = null;
662 protected Value currentValue = null;
663
664 @Override
665 public K getCurrentKey() throws IOException, InterruptedException {
666 return currentK;
667 }
668
669 @Override
670 public V getCurrentValue() throws IOException, InterruptedException {
671 return currentV;
672 }
673 }
674
675 Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException,
676 AccumuloException, AccumuloSecurityException {
677
678 Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
679
680 Instance instance = getInstance(context);
681 Connector conn = instance.getConnector(getPrincipal(context), getToken(context));
682 String tableId = Tables.getTableId(instance, tableName);
683
684 if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
685 Tables.clearCache(instance);
686 if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
687 throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode ");
688 }
689 }
690
691 for (Range range : ranges) {
692 Text startRow;
693
694 if (range.getStartKey() != null)
695 startRow = range.getStartKey().getRow();
696 else
697 startRow = new Text();
698
699 Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
700 Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
701 Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
702 scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY);
703 scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
704 scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
705 scanner.setRange(metadataRange);
706
707 RowIterator rowIter = new RowIterator(scanner);
708
709
710
711 KeyExtent lastExtent = null;
712
713 while (rowIter.hasNext()) {
714 Iterator<Entry<Key,Value>> row = rowIter.next();
715 String last = "";
716 KeyExtent extent = null;
717 String location = null;
718
719 while (row.hasNext()) {
720 Entry<Key,Value> entry = row.next();
721 Key key = entry.getKey();
722
723 if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) {
724 last = entry.getValue().toString();
725 }
726
727 if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)
728 || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
729 location = entry.getValue().toString();
730 }
731
732 if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
733 extent = new KeyExtent(key.getRow(), entry.getValue());
734 }
735
736 }
737
738 if (location != null)
739 return null;
740
741 if (!extent.getTableId().toString().equals(tableId)) {
742 throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
743 }
744
745 if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
746 throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
747 }
748
749 Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
750 if (tabletRanges == null) {
751 tabletRanges = new HashMap<KeyExtent,List<Range>>();
752 binnedRanges.put(last, tabletRanges);
753 }
754
755 List<Range> rangeList = tabletRanges.get(extent);
756 if (rangeList == null) {
757 rangeList = new ArrayList<Range>();
758 tabletRanges.put(extent, rangeList);
759 }
760
761 rangeList.add(range);
762
763 if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
764 break;
765 }
766
767 lastExtent = extent;
768 }
769
770 }
771
772 return binnedRanges;
773 }
774
775 /**
776 * Read the metadata table to get tablets and match up ranges to them.
777 */
778 @Override
779 public List<InputSplit> getSplits(JobContext context) throws IOException {
780 log.setLevel(getLogLevel(context));
781 validateOptions(context);
782
783 String tableName = getInputTableName(context);
784 boolean autoAdjust = getAutoAdjustRanges(context);
785 List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
786
787 if (ranges.isEmpty()) {
788 ranges = new ArrayList<Range>(1);
789 ranges.add(new Range());
790 }
791
792
793 Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
794 TabletLocator tl;
795 try {
796 if (isOfflineScan(context)) {
797 binnedRanges = binOfflineTable(context, tableName, ranges);
798 while (binnedRanges == null) {
799
800 UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
801 binnedRanges = binOfflineTable(context, tableName, ranges);
802 }
803 } else {
804 Instance instance = getInstance(context);
805 String tableId = null;
806 tl = getTabletLocator(context);
807
808 tl.invalidateCache();
809 while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
810 if (!(instance instanceof MockInstance)) {
811 if (tableId == null)
812 tableId = Tables.getTableId(instance, tableName);
813 if (!Tables.exists(instance, tableId))
814 throw new TableDeletedException(tableId);
815 if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
816 throw new TableOfflineException(instance, tableId);
817 }
818 binnedRanges.clear();
819 log.warn("Unable to locate bins for specified ranges. Retrying.");
820 UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
821 tl.invalidateCache();
822 }
823 }
824 } catch (Exception e) {
825 throw new IOException(e);
826 }
827
828 ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size());
829 HashMap<Range,ArrayList<String>> splitsToAdd = null;
830
831 if (!autoAdjust)
832 splitsToAdd = new HashMap<Range,ArrayList<String>>();
833
834 HashMap<String,String> hostNameCache = new HashMap<String,String>();
835
836 for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
837 String ip = tserverBin.getKey().split(":", 2)[0];
838 String location = hostNameCache.get(ip);
839 if (location == null) {
840 InetAddress inetAddress = InetAddress.getByName(ip);
841 location = inetAddress.getHostName();
842 hostNameCache.put(ip, location);
843 }
844
845 for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
846 Range ke = extentRanges.getKey().toDataRange();
847 for (Range r : extentRanges.getValue()) {
848 if (autoAdjust) {
849
850 splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] {location}));
851 } else {
852
853 ArrayList<String> locations = splitsToAdd.get(r);
854 if (locations == null)
855 locations = new ArrayList<String>(1);
856 locations.add(location);
857 splitsToAdd.put(r, locations);
858 }
859 }
860 }
861 }
862
863 if (!autoAdjust)
864 for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
865 splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0])));
866 return splits;
867 }
868
869 /**
870 * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
871 */
872 public static class RangeInputSplit extends InputSplit implements Writable {
873 private Range range;
874 private String[] locations;
875
876 public RangeInputSplit() {
877 range = new Range();
878 locations = new String[0];
879 }
880
881 public RangeInputSplit(RangeInputSplit split) throws IOException {
882 this.setRange(split.getRange());
883 this.setLocations(split.getLocations());
884 }
885
886 protected RangeInputSplit(String table, Range range, String[] locations) {
887 this.range = range;
888 this.locations = locations;
889 }
890
891 public Range getRange() {
892 return range;
893 }
894
895 public void setRange(Range range) {
896 this.range = range;
897 }
898
899 private static byte[] extractBytes(ByteSequence seq, int numBytes) {
900 byte[] bytes = new byte[numBytes + 1];
901 bytes[0] = 0;
902 for (int i = 0; i < numBytes; i++) {
903 if (i >= seq.length())
904 bytes[i + 1] = 0;
905 else
906 bytes[i + 1] = seq.byteAt(i);
907 }
908 return bytes;
909 }
910
911 public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
912 int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
913 BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
914 BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
915 BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
916 return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
917 }
918
919 public float getProgress(Key currentKey) {
920 if (currentKey == null)
921 return 0f;
922 if (range.getStartKey() != null && range.getEndKey() != null) {
923 if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW)) {
924
925 return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
926 } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM)) {
927
928 return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
929 } else if (!range.getStartKey().equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
930
931 return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
932 }
933 }
934
935 return 0f;
936 }
937
938 /**
939 * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
940 */
941 @Override
942 public long getLength() throws IOException {
943 Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
944 Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
945 int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
946 long diff = 0;
947
948 byte[] start = startRow.getBytes();
949 byte[] stop = stopRow.getBytes();
950 for (int i = 0; i < maxCommon; ++i) {
951 diff |= 0xff & (start[i] ^ stop[i]);
952 diff <<= Byte.SIZE;
953 }
954
955 if (startRow.getLength() != stopRow.getLength())
956 diff |= 0xff;
957
958 return diff + 1;
959 }
960
961 @Override
962 public String[] getLocations() throws IOException {
963 return locations;
964 }
965
966 public void setLocations(String[] locations) {
967 this.locations = locations;
968 }
969
970 @Override
971 public void readFields(DataInput in) throws IOException {
972 range.readFields(in);
973 int numLocs = in.readInt();
974 locations = new String[numLocs];
975 for (int i = 0; i < numLocs; ++i)
976 locations[i] = in.readUTF();
977 }
978
979 @Override
980 public void write(DataOutput out) throws IOException {
981 range.write(out);
982 out.writeInt(locations.length);
983 for (int i = 0; i < locations.length; ++i)
984 out.writeUTF(locations[i]);
985 }
986 }
987
988
989
990
991
992 /**
993 * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead.
994 */
995 @Deprecated
996 public static void setIsolated(Configuration conf, boolean enable) {
997 InputConfigurator.setScanIsolation(CLASS, conf, enable);
998 }
999
1000 /**
1001 * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead.
1002 */
1003 @Deprecated
1004 public static void setLocalIterators(Configuration conf, boolean enable) {
1005 InputConfigurator.setLocalIterators(CLASS, conf, enable);
1006 }
1007
1008 /**
1009 * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setInputTableName(Job, String)}, and
1010 * {@link #setScanAuthorizations(Job, Authorizations)} instead.
1011 */
1012 @Deprecated
1013 public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) {
1014 try {
1015 InputConfigurator.setConnectorInfo(CLASS, conf, user, new PasswordToken().setPassword(passwd));
1016 } catch (AccumuloSecurityException e) {
1017 throw new RuntimeException(e);
1018 }
1019 InputConfigurator.setInputTableName(CLASS, conf, table);
1020 InputConfigurator.setScanAuthorizations(CLASS, conf, auths);
1021 }
1022
1023 /**
1024 * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead.
1025 */
1026 @Deprecated
1027 public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
1028 InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers);
1029 }
1030
1031 /**
1032 * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead.
1033 */
1034 @Deprecated
1035 public static void setMockInstance(Configuration conf, String instanceName) {
1036 InputConfigurator.setMockInstance(CLASS, conf, instanceName);
1037 }
1038
1039 /**
1040 * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead.
1041 */
1042 @Deprecated
1043 public static void setRanges(Configuration conf, Collection<Range> ranges) {
1044 InputConfigurator.setRanges(CLASS, conf, ranges);
1045 }
1046
1047 /**
1048 * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead.
1049 */
1050 @Deprecated
1051 public static void disableAutoAdjustRanges(Configuration conf) {
1052 InputConfigurator.setAutoAdjustRanges(CLASS, conf, false);
1053 }
1054
1055 /**
1056 * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1057 */
1058 @Deprecated
1059 public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException {
1060 IteratorSetting vers = new IteratorSetting(1, "vers", VersioningIterator.class);
1061 try {
1062 VersioningIterator.setMaxVersions(vers, maxVersions);
1063 } catch (IllegalArgumentException e) {
1064 throw new IOException(e);
1065 }
1066 InputConfigurator.addIterator(CLASS, conf, vers);
1067 }
1068
1069 /**
1070 * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead.
1071 */
1072 @Deprecated
1073 public static void setScanOffline(Configuration conf, boolean scanOff) {
1074 InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff);
1075 }
1076
1077 /**
1078 * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead.
1079 */
1080 @Deprecated
1081 public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
1082 InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs);
1083 }
1084
1085 /**
1086 * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
1087 */
1088 @Deprecated
1089 public static void setLogLevel(Configuration conf, Level level) {
1090 InputConfigurator.setLogLevel(CLASS, conf, level);
1091 }
1092
1093 /**
1094 * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead.
1095 */
1096 @Deprecated
1097 public static void addIterator(Configuration conf, IteratorSetting cfg) {
1098 InputConfigurator.addIterator(CLASS, conf, cfg);
1099 }
1100
1101 /**
1102 * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead.
1103 */
1104 @Deprecated
1105 protected static boolean isIsolated(Configuration conf) {
1106 return InputConfigurator.isIsolated(CLASS, conf);
1107 }
1108
1109 /**
1110 * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead.
1111 */
1112 @Deprecated
1113 protected static boolean usesLocalIterators(Configuration conf) {
1114 return InputConfigurator.usesLocalIterators(CLASS, conf);
1115 }
1116
1117 /**
1118 * @deprecated since 1.5.0; Use {@link #getPrincipal(JobContext)} instead.
1119 */
1120 @Deprecated
1121 protected static String getPrincipal(Configuration conf) {
1122 return InputConfigurator.getPrincipal(CLASS, conf);
1123 }
1124
1125 /**
1126 * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
1127 */
1128 @Deprecated
1129 protected static byte[] getToken(Configuration conf) {
1130 return InputConfigurator.getToken(CLASS, conf);
1131 }
1132
1133 /**
1134 * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead.
1135 */
1136 @Deprecated
1137 protected static String getTablename(Configuration conf) {
1138 return InputConfigurator.getInputTableName(CLASS, conf);
1139 }
1140
1141 /**
1142 * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead.
1143 */
1144 @Deprecated
1145 protected static Authorizations getAuthorizations(Configuration conf) {
1146 return InputConfigurator.getScanAuthorizations(CLASS, conf);
1147 }
1148
1149 /**
1150 * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
1151 */
1152 @Deprecated
1153 protected static Instance getInstance(Configuration conf) {
1154 return InputConfigurator.getInstance(CLASS, conf);
1155 }
1156
1157 /**
1158 * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead.
1159 */
1160 @Deprecated
1161 protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException {
1162 return InputConfigurator.getTabletLocator(CLASS, conf);
1163 }
1164
1165 /**
1166 * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead.
1167 */
1168 @Deprecated
1169 protected static List<Range> getRanges(Configuration conf) throws IOException {
1170 return InputConfigurator.getRanges(CLASS, conf);
1171 }
1172
1173 /**
1174 * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead.
1175 */
1176 @Deprecated
1177 protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) {
1178 return InputConfigurator.getFetchedColumns(CLASS, conf);
1179 }
1180
1181 /**
1182 * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead.
1183 */
1184 @Deprecated
1185 protected static boolean getAutoAdjustRanges(Configuration conf) {
1186 return InputConfigurator.getAutoAdjustRanges(CLASS, conf);
1187 }
1188
1189 /**
1190 * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
1191 */
1192 @Deprecated
1193 protected static Level getLogLevel(Configuration conf) {
1194 return InputConfigurator.getLogLevel(CLASS, conf);
1195 }
1196
1197 /**
1198 * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead.
1199 */
1200 @Deprecated
1201 protected static void validateOptions(Configuration conf) throws IOException {
1202 InputConfigurator.validateOptions(CLASS, conf);
1203 }
1204
1205 /**
1206 * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead.
1207 */
1208 @Deprecated
1209 protected static int getMaxVersions(Configuration conf) {
1210
1211
1212 List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1213 for (IteratorSetting setting : iteratorSettings) {
1214 if ("vers".equals(setting.getName()) && 1 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) {
1215 if (setting.getOptions().containsKey("maxVersions"))
1216 return Integer.parseInt(setting.getOptions().get("maxVersions"));
1217 else
1218 return -1;
1219 }
1220 }
1221 return -1;
1222 }
1223
1224 /**
1225 * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead.
1226 */
1227 @Deprecated
1228 protected static boolean isOfflineScan(Configuration conf) {
1229 return InputConfigurator.isOfflineScan(CLASS, conf);
1230 }
1231
1232 /**
1233 * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1234 */
1235 @Deprecated
1236 protected static List<AccumuloIterator> getIterators(Configuration conf) {
1237 List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1238 List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size());
1239 for (IteratorSetting setting : iteratorSettings) {
1240 AccumuloIterator deprecatedIter = new AccumuloIterator(new String(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass()
1241 + AccumuloIterator.FIELD_SEP + setting.getName()));
1242 deprecatedIterators.add(deprecatedIter);
1243 }
1244 return deprecatedIterators;
1245 }
1246
1247 /**
1248 * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead.
1249 */
1250 @Deprecated
1251 protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) {
1252 List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf);
1253 List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size());
1254 for (IteratorSetting setting : iteratorSettings) {
1255 for (Entry<String,String> opt : setting.getOptions().entrySet()) {
1256 String deprecatedOption;
1257 try {
1258 deprecatedOption = new String(setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8")
1259 + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"));
1260 } catch (UnsupportedEncodingException e) {
1261 throw new RuntimeException(e);
1262 }
1263 deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption));
1264 }
1265 }
1266 return deprecatedIteratorOptions;
1267 }
1268
1269 /**
1270 * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1271 */
1272 @Deprecated
1273 static class AccumuloIterator {
1274
1275 private static final String FIELD_SEP = ":";
1276
1277 private int priority;
1278 private String iteratorClass;
1279 private String iteratorName;
1280
1281 public AccumuloIterator(int priority, String iteratorClass, String iteratorName) {
1282 this.priority = priority;
1283 this.iteratorClass = iteratorClass;
1284 this.iteratorName = iteratorName;
1285 }
1286
1287
1288 public AccumuloIterator(String iteratorSetting) {
1289
1290 StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP);
1291 priority = Integer.parseInt(tokenizer.nextToken());
1292 iteratorClass = tokenizer.nextToken();
1293 iteratorName = tokenizer.nextToken();
1294 }
1295
1296 public int getPriority() {
1297 return priority;
1298 }
1299
1300 public String getIteratorClass() {
1301 return iteratorClass;
1302 }
1303
1304 public String getIteratorName() {
1305 return iteratorName;
1306 }
1307
1308 @Override
1309 public String toString() {
1310 return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName);
1311 }
1312
1313 }
1314
1315 /**
1316 * @deprecated since 1.5.0; Use {@link IteratorSetting} instead.
1317 */
1318 @Deprecated
1319 static class AccumuloIteratorOption {
1320 private static final String FIELD_SEP = ":";
1321
1322 private String iteratorName;
1323 private String key;
1324 private String value;
1325
1326 public AccumuloIteratorOption(String iteratorName, String key, String value) {
1327 this.iteratorName = iteratorName;
1328 this.key = key;
1329 this.value = value;
1330 }
1331
1332
1333 public AccumuloIteratorOption(String iteratorOption) {
1334 StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP);
1335 this.iteratorName = tokenizer.nextToken();
1336 try {
1337 this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1338 this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8");
1339 } catch (UnsupportedEncodingException e) {
1340 throw new RuntimeException(e);
1341 }
1342 }
1343
1344 public String getIteratorName() {
1345 return iteratorName;
1346 }
1347
1348 public String getKey() {
1349 return key;
1350 }
1351
1352 public String getValue() {
1353 return value;
1354 }
1355
1356 @Override
1357 public String toString() {
1358 try {
1359 return new String(iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"));
1360 } catch (UnsupportedEncodingException e) {
1361 throw new RuntimeException(e);
1362 }
1363 }
1364
1365 }
1366
1367 }