View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.UnsupportedEncodingException;
22  import java.net.InetSocketAddress;
23  import java.net.URLDecoder;
24  import java.net.URLEncoder;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.classification.InterfaceStability;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellComparator;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionLocation;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Connection;
51  import org.apache.hadoop.hbase.client.ConnectionFactory;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.fs.HFileSystem;
55  import org.apache.hadoop.hbase.client.RegionLocator;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
58  import org.apache.hadoop.hbase.io.compress.Compression;
59  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
60  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61  import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
62  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
63  import org.apache.hadoop.hbase.io.hfile.HFile;
64  import org.apache.hadoop.hbase.io.hfile.HFileContext;
65  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
66  import org.apache.hadoop.hbase.regionserver.BloomType;
67  import org.apache.hadoop.hbase.regionserver.HStore;
68  import org.apache.hadoop.hbase.regionserver.StoreFile;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.io.NullWritable;
71  import org.apache.hadoop.io.SequenceFile;
72  import org.apache.hadoop.io.Text;
73  import org.apache.hadoop.mapreduce.Job;
74  import org.apache.hadoop.mapreduce.OutputFormat;
75  import org.apache.hadoop.mapreduce.RecordWriter;
76  import org.apache.hadoop.mapreduce.TaskAttemptContext;
77  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
78  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
79  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
80  
81  import com.google.common.annotations.VisibleForTesting;
82  
83  /**
84   * Writes HFiles. Passed Cells must arrive in order.
85   * Writes current time as the sequence id for the file. Sets the major compacted
86   * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
87   * all HFiles being written.
88   * <p>
89   * Using this class as part of a MapReduce job is best done
90   * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}.
91   */
92  @InterfaceAudience.Public
93  @InterfaceStability.Evolving
94  public class HFileOutputFormat2
95      extends FileOutputFormat<ImmutableBytesWritable, Cell> {
96    private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
97  
98    // The following constants are private since these are used by
99    // HFileOutputFormat2 to internally transfer data between job setup and
100   // reducer run using conf.
101   // These should not be changed by the client.
102   private static final String COMPRESSION_FAMILIES_CONF_KEY =
103       "hbase.hfileoutputformat.families.compression";
104   private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
105       "hbase.hfileoutputformat.families.bloomtype";
106   private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
107       "hbase.mapreduce.hfileoutputformat.blocksize";
108   private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
109       "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
110 
111   // This constant is public since the client can modify this when setting
112   // up their conf object and thus refer to this symbol.
113   // It is present for backwards compatibility reasons. Use it only to
114   // override the auto-detection of datablock encoding.
115   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
116       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
117 
118   /**
119    * Keep locality while generating HFiles for bulkload. See HBASE-12596
120    */
121   public static final String LOCALITY_SENSITIVE_CONF_KEY =
122       "hbase.bulkload.locality.sensitive.enabled";
123   private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
124   private static final String OUTPUT_TABLE_NAME_CONF_KEY =
125       "hbase.mapreduce.hfileoutputformat.table.name";
126 
127   @Override
128   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
129       final TaskAttemptContext context) throws IOException, InterruptedException {
130     return createRecordWriter(context);
131   }
132 
133   static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
134       createRecordWriter(final TaskAttemptContext context)
135           throws IOException {
136 
137     // Get the path of the temporary output file
138     final Path outputPath = FileOutputFormat.getOutputPath(context);
139     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
140     final Configuration conf = context.getConfiguration();
141     final FileSystem fs = outputdir.getFileSystem(conf);
142     // These configs. are from hbase-*.xml
143     final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
144         HConstants.DEFAULT_MAX_FILE_SIZE);
145     // Invented config.  Add to hbase-*.xml if other than default compression.
146     final String defaultCompressionStr = conf.get("hfile.compression",
147         Compression.Algorithm.NONE.getName());
148     final Algorithm defaultCompression = HFileWriterImpl
149         .compressionByName(defaultCompressionStr);
150     final boolean compactionExclude = conf.getBoolean(
151         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
152 
153     // create a map from column family to the compression algorithm
154     final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
155     final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
156     final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
157 
158     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
159     final Map<byte[], DataBlockEncoding> datablockEncodingMap
160         = createFamilyDataBlockEncodingMap(conf);
161     final DataBlockEncoding overriddenEncoding;
162     if (dataBlockEncodingStr != null) {
163       overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
164     } else {
165       overriddenEncoding = null;
166     }
167 
168     return new RecordWriter<ImmutableBytesWritable, V>() {
169       // Map of families to writers and how much has been output on the writer.
170       private final Map<byte [], WriterLength> writers =
171         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
172       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
173       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
174       private boolean rollRequested = false;
175 
176       @Override
177       public void write(ImmutableBytesWritable row, V cell)
178           throws IOException {
179         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
180 
181         // null input == user explicitly wants to flush
182         if (row == null && kv == null) {
183           rollWriters();
184           return;
185         }
186 
187         byte [] rowKey = CellUtil.cloneRow(kv);
188         long length = kv.getLength();
189         byte [] family = CellUtil.cloneFamily(kv);
190         WriterLength wl = this.writers.get(family);
191 
192         // If this is a new column family, verify that the directory exists
193         if (wl == null) {
194           fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
195         }
196 
197         // If any of the HFiles for the column families has reached
198         // maxsize, we need to roll all the writers
199         if (wl != null && wl.written + length >= maxsize) {
200           this.rollRequested = true;
201         }
202 
203         // This can only happen once a row is finished though
204         if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
205           rollWriters();
206         }
207 
208         // create a new WAL writer, if necessary
209         if (wl == null || wl.writer == null) {
210           if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
211             HRegionLocation loc = null;
212             String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
213             if (tableName != null) {
214               try (Connection connection = ConnectionFactory.createConnection(conf);
215                      RegionLocator locator =
216                        connection.getRegionLocator(TableName.valueOf(tableName))) {
217                 loc = locator.getRegionLocation(rowKey);
218               } catch (Throwable e) {
219                 LOG.warn("there's something wrong when locating rowkey: " +
220                   Bytes.toString(rowKey), e);
221                 loc = null;
222               }
223             }
224 
225             if (null == loc) {
226               if (LOG.isTraceEnabled()) {
227                 LOG.trace("failed to get region location, so use default writer: " +
228                   Bytes.toString(rowKey));
229               }
230               wl = getNewWriter(family, conf, null);
231             } else {
232               if (LOG.isDebugEnabled()) {
233                 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
234               }
235               InetSocketAddress initialIsa =
236                   new InetSocketAddress(loc.getHostname(), loc.getPort());
237               if (initialIsa.isUnresolved()) {
238                 if (LOG.isTraceEnabled()) {
239                   LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
240                       + loc.getPort() + ", so use default writer");
241                 }
242                 wl = getNewWriter(family, conf, null);
243               } else {
244                 if(LOG.isDebugEnabled()) {
245                   LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
246                 }
247                 wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
248               }
249             }
250           } else {
251             wl = getNewWriter(family, conf, null);
252           }
253         }
254 
255         // we now have the proper WAL writer. full steam ahead
256         kv.updateLatestStamp(this.now);
257         wl.writer.append(kv);
258         wl.written += length;
259 
260         // Copy the row so we know when a row transition.
261         this.previousRow = rowKey;
262       }
263 
264       private void rollWriters() throws IOException {
265         for (WriterLength wl : this.writers.values()) {
266           if (wl.writer != null) {
267             LOG.info("Writer=" + wl.writer.getPath() +
268                 ((wl.written == 0)? "": ", wrote=" + wl.written));
269             close(wl.writer);
270           }
271           wl.writer = null;
272           wl.written = 0;
273         }
274         this.rollRequested = false;
275       }
276 
277       /* Create a new StoreFile.Writer.
278        * @param family
279        * @return A WriterLength, containing a new StoreFile.Writer.
280        * @throws IOException
281        */
282       @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
283           justification="Not important")
284       private WriterLength getNewWriter(byte[] family, Configuration conf,
285           InetSocketAddress[] favoredNodes) throws IOException {
286         WriterLength wl = new WriterLength();
287         Path familydir = new Path(outputdir, Bytes.toString(family));
288         Algorithm compression = compressionMap.get(family);
289         compression = compression == null ? defaultCompression : compression;
290         BloomType bloomType = bloomTypeMap.get(family);
291         bloomType = bloomType == null ? BloomType.NONE : bloomType;
292         Integer blockSize = blockSizeMap.get(family);
293         blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
294         DataBlockEncoding encoding = overriddenEncoding;
295         encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
296         encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
297         Configuration tempConf = new Configuration(conf);
298         tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
299         HFileContextBuilder contextBuilder = new HFileContextBuilder()
300                                     .withCompression(compression)
301                                     .withChecksumType(HStore.getChecksumType(conf))
302                                     .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
303                                     .withBlockSize(blockSize);
304         contextBuilder.withDataBlockEncoding(encoding);
305         HFileContext hFileContext = contextBuilder.build();
306                                     
307         if (null == favoredNodes) {
308           wl.writer =
309               new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
310                   .withOutputDir(familydir).withBloomType(bloomType)
311                   .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
312         } else {
313           wl.writer =
314               new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
315                   .withOutputDir(familydir).withBloomType(bloomType)
316                   .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
317                   .withFavoredNodes(favoredNodes).build();
318         }
319 
320         this.writers.put(family, wl);
321         return wl;
322       }
323 
324       private void close(final StoreFile.Writer w) throws IOException {
325         if (w != null) {
326           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
327               Bytes.toBytes(System.currentTimeMillis()));
328           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
329               Bytes.toBytes(context.getTaskAttemptID().toString()));
330           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
331               Bytes.toBytes(true));
332           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
333               Bytes.toBytes(compactionExclude));
334           w.appendTrackedTimestampsToMetadata();
335           w.close();
336         }
337       }
338 
339       @Override
340       public void close(TaskAttemptContext c)
341       throws IOException, InterruptedException {
342         for (WriterLength wl: this.writers.values()) {
343           close(wl.writer);
344         }
345       }
346     };
347   }
348 
349   /*
350    * Data structure to hold a Writer and amount of data written on it.
351    */
352   static class WriterLength {
353     long written = 0;
354     StoreFile.Writer writer = null;
355   }
356 
357   /**
358    * Return the start keys of all of the regions in this table,
359    * as a list of ImmutableBytesWritable.
360    */
361   private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
362   throws IOException {
363     byte[][] byteKeys = table.getStartKeys();
364     ArrayList<ImmutableBytesWritable> ret =
365       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
366     for (byte[] byteKey : byteKeys) {
367       ret.add(new ImmutableBytesWritable(byteKey));
368     }
369     return ret;
370   }
371 
372   /**
373    * Write out a {@link SequenceFile} that can be read by
374    * {@link TotalOrderPartitioner} that contains the split points in startKeys.
375    */
376   @SuppressWarnings("deprecation")
377   private static void writePartitions(Configuration conf, Path partitionsPath,
378       List<ImmutableBytesWritable> startKeys) throws IOException {
379     LOG.info("Writing partition information to " + partitionsPath);
380     if (startKeys.isEmpty()) {
381       throw new IllegalArgumentException("No regions passed");
382     }
383 
384     // We're generating a list of split points, and we don't ever
385     // have keys < the first region (which has an empty start key)
386     // so we need to remove it. Otherwise we would end up with an
387     // empty reducer with index 0
388     TreeSet<ImmutableBytesWritable> sorted =
389       new TreeSet<ImmutableBytesWritable>(startKeys);
390 
391     ImmutableBytesWritable first = sorted.first();
392     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
393       throw new IllegalArgumentException(
394           "First region of table should have empty start key. Instead has: "
395           + Bytes.toStringBinary(first.get()));
396     }
397     sorted.remove(first);
398 
399     // Write the actual file
400     FileSystem fs = partitionsPath.getFileSystem(conf);
401     SequenceFile.Writer writer = SequenceFile.createWriter(
402       fs, conf, partitionsPath, ImmutableBytesWritable.class,
403       NullWritable.class);
404 
405     try {
406       for (ImmutableBytesWritable startKey : sorted) {
407         writer.append(startKey, NullWritable.get());
408       }
409     } finally {
410       writer.close();
411     }
412   }
413 
414   /**
415    * Configure a MapReduce Job to perform an incremental load into the given
416    * table. This
417    * <ul>
418    *   <li>Inspects the table to configure a total order partitioner</li>
419    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
420    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
421    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
422    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
423    *     PutSortReducer)</li>
424    * </ul>
425    * The user should be sure to set the map output value class to either KeyValue or Put before
426    * running this function.
427    * 
428    * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
429    */
430   @Deprecated
431   public static void configureIncrementalLoad(Job job, HTable table)
432       throws IOException {
433     configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
434   }
435 
436   /**
437    * Configure a MapReduce Job to perform an incremental load into the given
438    * table. This
439    * <ul>
440    *   <li>Inspects the table to configure a total order partitioner</li>
441    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
442    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
443    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
444    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
445    *     PutSortReducer)</li>
446    * </ul>
447    * The user should be sure to set the map output value class to either KeyValue or Put before
448    * running this function.
449    */
450   public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
451       throws IOException {
452     configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
453   }
454 
455   /**
456    * Configure a MapReduce Job to perform an incremental load into the given
457    * table. This
458    * <ul>
459    *   <li>Inspects the table to configure a total order partitioner</li>
460    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
461    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
462    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
463    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
464    *     PutSortReducer)</li>
465    * </ul>
466    * The user should be sure to set the map output value class to either KeyValue or Put before
467    * running this function.
468    */
469   public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
470       RegionLocator regionLocator) throws IOException {
471     configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
472   }
473 
474   static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
475       RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
476       UnsupportedEncodingException {
477     Configuration conf = job.getConfiguration();
478     job.setOutputKeyClass(ImmutableBytesWritable.class);
479     job.setOutputValueClass(KeyValue.class);
480     job.setOutputFormatClass(cls);
481 
482     // Based on the configured map output class, set the correct reducer to properly
483     // sort the incoming values.
484     // TODO it would be nice to pick one or the other of these formats.
485     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
486       job.setReducerClass(KeyValueSortReducer.class);
487     } else if (Put.class.equals(job.getMapOutputValueClass())) {
488       job.setReducerClass(PutSortReducer.class);
489     } else if (Text.class.equals(job.getMapOutputValueClass())) {
490       job.setReducerClass(TextSortReducer.class);
491     } else {
492       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
493     }
494 
495     conf.setStrings("io.serializations", conf.get("io.serializations"),
496         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
497         KeyValueSerialization.class.getName());
498 
499     if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
500       // record this table name for creating writer by favored nodes
501       LOG.info("bulkload locality sensitive enabled");
502       conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
503     }
504 
505     // Use table's region boundaries for TOP split points.
506     LOG.info("Looking up current regions for table " + regionLocator.getName());
507     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
508     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
509         "to match current region count");
510     job.setNumReduceTasks(startKeys.size());
511 
512     configurePartitioner(job, startKeys);
513     // Set compression algorithms based on column families
514     configureCompression(conf, tableDescriptor);
515     configureBloomType(tableDescriptor, conf);
516     configureBlockSize(tableDescriptor, conf);
517     configureDataBlockEncoding(tableDescriptor, conf);
518 
519     TableMapReduceUtil.addDependencyJars(job);
520     TableMapReduceUtil.initCredentials(job);
521     LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
522   }
523   
524   public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
525       IOException {
526     Configuration conf = job.getConfiguration();
527 
528     job.setOutputKeyClass(ImmutableBytesWritable.class);
529     job.setOutputValueClass(KeyValue.class);
530     job.setOutputFormatClass(HFileOutputFormat2.class);
531 
532     // Set compression algorithms based on column families
533     configureCompression(conf, tableDescriptor);
534     configureBloomType(tableDescriptor, conf);
535     configureBlockSize(tableDescriptor, conf);
536     configureDataBlockEncoding(tableDescriptor, conf);
537 
538     TableMapReduceUtil.addDependencyJars(job);
539     TableMapReduceUtil.initCredentials(job);
540     LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
541   }
542 
543   /**
544    * Runs inside the task to deserialize column family to compression algorithm
545    * map from the configuration.
546    *
547    * @param conf to read the serialized values from
548    * @return a map from column family to the configured compression algorithm
549    */
550   @VisibleForTesting
551   static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
552       conf) {
553     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
554         COMPRESSION_FAMILIES_CONF_KEY);
555     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
556         Algorithm>(Bytes.BYTES_COMPARATOR);
557     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
558       Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
559       compressionMap.put(e.getKey(), algorithm);
560     }
561     return compressionMap;
562   }
563 
564   /**
565    * Runs inside the task to deserialize column family to bloom filter type
566    * map from the configuration.
567    *
568    * @param conf to read the serialized values from
569    * @return a map from column family to the the configured bloom filter type
570    */
571   @VisibleForTesting
572   static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
573     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
574         BLOOM_TYPE_FAMILIES_CONF_KEY);
575     Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
576         BloomType>(Bytes.BYTES_COMPARATOR);
577     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
578       BloomType bloomType = BloomType.valueOf(e.getValue());
579       bloomTypeMap.put(e.getKey(), bloomType);
580     }
581     return bloomTypeMap;
582   }
583 
584   /**
585    * Runs inside the task to deserialize column family to block size
586    * map from the configuration.
587    *
588    * @param conf to read the serialized values from
589    * @return a map from column family to the configured block size
590    */
591   @VisibleForTesting
592   static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
593     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
594         BLOCK_SIZE_FAMILIES_CONF_KEY);
595     Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
596         Integer>(Bytes.BYTES_COMPARATOR);
597     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
598       Integer blockSize = Integer.parseInt(e.getValue());
599       blockSizeMap.put(e.getKey(), blockSize);
600     }
601     return blockSizeMap;
602   }
603 
604   /**
605    * Runs inside the task to deserialize column family to data block encoding
606    * type map from the configuration.
607    *
608    * @param conf to read the serialized values from
609    * @return a map from column family to HFileDataBlockEncoder for the
610    *         configured data block type for the family
611    */
612   @VisibleForTesting
613   static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
614       Configuration conf) {
615     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
616         DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
617     Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
618         DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
619     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
620       encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
621     }
622     return encoderMap;
623   }
624 
625 
626   /**
627    * Run inside the task to deserialize column family to given conf value map.
628    *
629    * @param conf to read the serialized values from
630    * @param confName conf key to read from the configuration
631    * @return a map of column family to the given configuration value
632    */
633   private static Map<byte[], String> createFamilyConfValueMap(
634       Configuration conf, String confName) {
635     Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
636     String confVal = conf.get(confName, "");
637     for (String familyConf : confVal.split("&")) {
638       String[] familySplit = familyConf.split("=");
639       if (familySplit.length != 2) {
640         continue;
641       }
642       try {
643         confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
644             URLDecoder.decode(familySplit[1], "UTF-8"));
645       } catch (UnsupportedEncodingException e) {
646         // will not happen with UTF-8 encoding
647         throw new AssertionError(e);
648       }
649     }
650     return confValMap;
651   }
652 
653   /**
654    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
655    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
656    */
657   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
658       throws IOException {
659     Configuration conf = job.getConfiguration();
660     // create the partitions file
661     FileSystem fs = FileSystem.get(conf);
662     Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + UUID.randomUUID());
663     fs.makeQualified(partitionsPath);
664     writePartitions(conf, partitionsPath, splitPoints);
665     fs.deleteOnExit(partitionsPath);
666 
667     // configure job to use it
668     job.setPartitionerClass(TotalOrderPartitioner.class);
669     TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
670   }
671 
672   /**
673    * Serialize column family to compression algorithm map to configuration.
674    * Invoked while configuring the MR job for incremental load.
675    *
676    * @param tableDescriptor to read the properties from
677    * @param conf to persist serialized values into
678    * @throws IOException
679    *           on failure to read column family descriptors
680    */
681   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
682       value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
683   @VisibleForTesting
684   static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
685       throws UnsupportedEncodingException {
686     StringBuilder compressionConfigValue = new StringBuilder();
687     if(tableDescriptor == null){
688       // could happen with mock table instance
689       return;
690     }
691     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
692     int i = 0;
693     for (HColumnDescriptor familyDescriptor : families) {
694       if (i++ > 0) {
695         compressionConfigValue.append('&');
696       }
697       compressionConfigValue.append(URLEncoder.encode(
698         familyDescriptor.getNameAsString(), "UTF-8"));
699       compressionConfigValue.append('=');
700       compressionConfigValue.append(URLEncoder.encode(
701         familyDescriptor.getCompressionType().getName(), "UTF-8"));
702     }
703     // Get rid of the last ampersand
704     conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
705   }
706 
707   /**
708    * Serialize column family to block size map to configuration.
709    * Invoked while configuring the MR job for incremental load.
710    * @param tableDescriptor to read the properties from
711    * @param conf to persist serialized values into
712    *
713    * @throws IOException
714    *           on failure to read column family descriptors
715    */
716   @VisibleForTesting
717   static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
718       throws UnsupportedEncodingException {
719     StringBuilder blockSizeConfigValue = new StringBuilder();
720     if (tableDescriptor == null) {
721       // could happen with mock table instance
722       return;
723     }
724     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
725     int i = 0;
726     for (HColumnDescriptor familyDescriptor : families) {
727       if (i++ > 0) {
728         blockSizeConfigValue.append('&');
729       }
730       blockSizeConfigValue.append(URLEncoder.encode(
731           familyDescriptor.getNameAsString(), "UTF-8"));
732       blockSizeConfigValue.append('=');
733       blockSizeConfigValue.append(URLEncoder.encode(
734           String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
735     }
736     // Get rid of the last ampersand
737     conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
738   }
739 
740   /**
741    * Serialize column family to bloom type map to configuration.
742    * Invoked while configuring the MR job for incremental load.
743    * @param tableDescriptor to read the properties from
744    * @param conf to persist serialized values into
745    *
746    * @throws IOException
747    *           on failure to read column family descriptors
748    */
749   @VisibleForTesting
750   static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
751       throws UnsupportedEncodingException {
752     if (tableDescriptor == null) {
753       // could happen with mock table instance
754       return;
755     }
756     StringBuilder bloomTypeConfigValue = new StringBuilder();
757     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
758     int i = 0;
759     for (HColumnDescriptor familyDescriptor : families) {
760       if (i++ > 0) {
761         bloomTypeConfigValue.append('&');
762       }
763       bloomTypeConfigValue.append(URLEncoder.encode(
764         familyDescriptor.getNameAsString(), "UTF-8"));
765       bloomTypeConfigValue.append('=');
766       String bloomType = familyDescriptor.getBloomFilterType().toString();
767       if (bloomType == null) {
768         bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
769       }
770       bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
771     }
772     conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
773   }
774 
775   /**
776    * Serialize column family to data block encoding map to configuration.
777    * Invoked while configuring the MR job for incremental load.
778    *
779    * @param tableDescriptor to read the properties from
780    * @param conf to persist serialized values into
781    * @throws IOException
782    *           on failure to read column family descriptors
783    */
784   @VisibleForTesting
785   static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
786       Configuration conf) throws UnsupportedEncodingException {
787     if (tableDescriptor == null) {
788       // could happen with mock table instance
789       return;
790     }
791     StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
792     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
793     int i = 0;
794     for (HColumnDescriptor familyDescriptor : families) {
795       if (i++ > 0) {
796         dataBlockEncodingConfigValue.append('&');
797       }
798       dataBlockEncodingConfigValue.append(
799           URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
800       dataBlockEncodingConfigValue.append('=');
801       DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
802       if (encoding == null) {
803         encoding = DataBlockEncoding.NONE;
804       }
805       dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
806           "UTF-8"));
807     }
808     conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
809         dataBlockEncodingConfigValue.toString());
810   }
811 }