1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.io.UnsupportedEncodingException;
22 import java.net.InetSocketAddress;
23 import java.net.URLDecoder;
24 import java.net.URLEncoder;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.TreeMap;
30 import java.util.TreeSet;
31 import java.util.UUID;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellComparator;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.KeyValue;
48 import org.apache.hadoop.hbase.KeyValueUtil;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.Connection;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.fs.HFileSystem;
55 import org.apache.hadoop.hbase.client.RegionLocator;
56 import org.apache.hadoop.hbase.client.Table;
57 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
58 import org.apache.hadoop.hbase.io.compress.Compression;
59 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
60 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61 import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
62 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
63 import org.apache.hadoop.hbase.io.hfile.HFile;
64 import org.apache.hadoop.hbase.io.hfile.HFileContext;
65 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
66 import org.apache.hadoop.hbase.regionserver.BloomType;
67 import org.apache.hadoop.hbase.regionserver.HStore;
68 import org.apache.hadoop.hbase.regionserver.StoreFile;
69 import org.apache.hadoop.hbase.util.Bytes;
70 import org.apache.hadoop.io.NullWritable;
71 import org.apache.hadoop.io.SequenceFile;
72 import org.apache.hadoop.io.Text;
73 import org.apache.hadoop.mapreduce.Job;
74 import org.apache.hadoop.mapreduce.OutputFormat;
75 import org.apache.hadoop.mapreduce.RecordWriter;
76 import org.apache.hadoop.mapreduce.TaskAttemptContext;
77 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
78 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
79 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
80
81 import com.google.common.annotations.VisibleForTesting;
82
83
84
85
86
87
88
89
90
91
92 @InterfaceAudience.Public
93 @InterfaceStability.Evolving
94 public class HFileOutputFormat2
95 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
96 private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
97
98
99
100
101
102 private static final String COMPRESSION_FAMILIES_CONF_KEY =
103 "hbase.hfileoutputformat.families.compression";
104 private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
105 "hbase.hfileoutputformat.families.bloomtype";
106 private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
107 "hbase.mapreduce.hfileoutputformat.blocksize";
108 private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
109 "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
110
111
112
113
114
115 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
116 "hbase.mapreduce.hfileoutputformat.datablock.encoding";
117
118
119
120
121 public static final String LOCALITY_SENSITIVE_CONF_KEY =
122 "hbase.bulkload.locality.sensitive.enabled";
123 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
124 private static final String OUTPUT_TABLE_NAME_CONF_KEY =
125 "hbase.mapreduce.hfileoutputformat.table.name";
126
127 @Override
128 public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
129 final TaskAttemptContext context) throws IOException, InterruptedException {
130 return createRecordWriter(context);
131 }
132
133 static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
134 createRecordWriter(final TaskAttemptContext context)
135 throws IOException {
136
137
138 final Path outputPath = FileOutputFormat.getOutputPath(context);
139 final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
140 final Configuration conf = context.getConfiguration();
141 final FileSystem fs = outputdir.getFileSystem(conf);
142
143 final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
144 HConstants.DEFAULT_MAX_FILE_SIZE);
145
146 final String defaultCompressionStr = conf.get("hfile.compression",
147 Compression.Algorithm.NONE.getName());
148 final Algorithm defaultCompression = HFileWriterImpl
149 .compressionByName(defaultCompressionStr);
150 final boolean compactionExclude = conf.getBoolean(
151 "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
152
153
154 final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
155 final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
156 final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
157
158 String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
159 final Map<byte[], DataBlockEncoding> datablockEncodingMap
160 = createFamilyDataBlockEncodingMap(conf);
161 final DataBlockEncoding overriddenEncoding;
162 if (dataBlockEncodingStr != null) {
163 overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
164 } else {
165 overriddenEncoding = null;
166 }
167
168 return new RecordWriter<ImmutableBytesWritable, V>() {
169
170 private final Map<byte [], WriterLength> writers =
171 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
172 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
173 private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
174 private boolean rollRequested = false;
175
176 @Override
177 public void write(ImmutableBytesWritable row, V cell)
178 throws IOException {
179 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
180
181
182 if (row == null && kv == null) {
183 rollWriters();
184 return;
185 }
186
187 byte [] rowKey = CellUtil.cloneRow(kv);
188 long length = kv.getLength();
189 byte [] family = CellUtil.cloneFamily(kv);
190 WriterLength wl = this.writers.get(family);
191
192
193 if (wl == null) {
194 fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
195 }
196
197
198
199 if (wl != null && wl.written + length >= maxsize) {
200 this.rollRequested = true;
201 }
202
203
204 if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
205 rollWriters();
206 }
207
208
209 if (wl == null || wl.writer == null) {
210 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
211 HRegionLocation loc = null;
212 String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
213 if (tableName != null) {
214 try (Connection connection = ConnectionFactory.createConnection(conf);
215 RegionLocator locator =
216 connection.getRegionLocator(TableName.valueOf(tableName))) {
217 loc = locator.getRegionLocation(rowKey);
218 } catch (Throwable e) {
219 LOG.warn("there's something wrong when locating rowkey: " +
220 Bytes.toString(rowKey), e);
221 loc = null;
222 }
223 }
224
225 if (null == loc) {
226 if (LOG.isTraceEnabled()) {
227 LOG.trace("failed to get region location, so use default writer: " +
228 Bytes.toString(rowKey));
229 }
230 wl = getNewWriter(family, conf, null);
231 } else {
232 if (LOG.isDebugEnabled()) {
233 LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
234 }
235 InetSocketAddress initialIsa =
236 new InetSocketAddress(loc.getHostname(), loc.getPort());
237 if (initialIsa.isUnresolved()) {
238 if (LOG.isTraceEnabled()) {
239 LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
240 + loc.getPort() + ", so use default writer");
241 }
242 wl = getNewWriter(family, conf, null);
243 } else {
244 if(LOG.isDebugEnabled()) {
245 LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
246 }
247 wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
248 }
249 }
250 } else {
251 wl = getNewWriter(family, conf, null);
252 }
253 }
254
255
256 kv.updateLatestStamp(this.now);
257 wl.writer.append(kv);
258 wl.written += length;
259
260
261 this.previousRow = rowKey;
262 }
263
264 private void rollWriters() throws IOException {
265 for (WriterLength wl : this.writers.values()) {
266 if (wl.writer != null) {
267 LOG.info("Writer=" + wl.writer.getPath() +
268 ((wl.written == 0)? "": ", wrote=" + wl.written));
269 close(wl.writer);
270 }
271 wl.writer = null;
272 wl.written = 0;
273 }
274 this.rollRequested = false;
275 }
276
277
278
279
280
281
282 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
283 justification="Not important")
284 private WriterLength getNewWriter(byte[] family, Configuration conf,
285 InetSocketAddress[] favoredNodes) throws IOException {
286 WriterLength wl = new WriterLength();
287 Path familydir = new Path(outputdir, Bytes.toString(family));
288 Algorithm compression = compressionMap.get(family);
289 compression = compression == null ? defaultCompression : compression;
290 BloomType bloomType = bloomTypeMap.get(family);
291 bloomType = bloomType == null ? BloomType.NONE : bloomType;
292 Integer blockSize = blockSizeMap.get(family);
293 blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
294 DataBlockEncoding encoding = overriddenEncoding;
295 encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
296 encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
297 Configuration tempConf = new Configuration(conf);
298 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
299 HFileContextBuilder contextBuilder = new HFileContextBuilder()
300 .withCompression(compression)
301 .withChecksumType(HStore.getChecksumType(conf))
302 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
303 .withBlockSize(blockSize);
304 contextBuilder.withDataBlockEncoding(encoding);
305 HFileContext hFileContext = contextBuilder.build();
306
307 if (null == favoredNodes) {
308 wl.writer =
309 new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
310 .withOutputDir(familydir).withBloomType(bloomType)
311 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
312 } else {
313 wl.writer =
314 new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
315 .withOutputDir(familydir).withBloomType(bloomType)
316 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
317 .withFavoredNodes(favoredNodes).build();
318 }
319
320 this.writers.put(family, wl);
321 return wl;
322 }
323
324 private void close(final StoreFile.Writer w) throws IOException {
325 if (w != null) {
326 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
327 Bytes.toBytes(System.currentTimeMillis()));
328 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
329 Bytes.toBytes(context.getTaskAttemptID().toString()));
330 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
331 Bytes.toBytes(true));
332 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
333 Bytes.toBytes(compactionExclude));
334 w.appendTrackedTimestampsToMetadata();
335 w.close();
336 }
337 }
338
339 @Override
340 public void close(TaskAttemptContext c)
341 throws IOException, InterruptedException {
342 for (WriterLength wl: this.writers.values()) {
343 close(wl.writer);
344 }
345 }
346 };
347 }
348
349
350
351
352 static class WriterLength {
353 long written = 0;
354 StoreFile.Writer writer = null;
355 }
356
357
358
359
360
361 private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table)
362 throws IOException {
363 byte[][] byteKeys = table.getStartKeys();
364 ArrayList<ImmutableBytesWritable> ret =
365 new ArrayList<ImmutableBytesWritable>(byteKeys.length);
366 for (byte[] byteKey : byteKeys) {
367 ret.add(new ImmutableBytesWritable(byteKey));
368 }
369 return ret;
370 }
371
372
373
374
375
376 @SuppressWarnings("deprecation")
377 private static void writePartitions(Configuration conf, Path partitionsPath,
378 List<ImmutableBytesWritable> startKeys) throws IOException {
379 LOG.info("Writing partition information to " + partitionsPath);
380 if (startKeys.isEmpty()) {
381 throw new IllegalArgumentException("No regions passed");
382 }
383
384
385
386
387
388 TreeSet<ImmutableBytesWritable> sorted =
389 new TreeSet<ImmutableBytesWritable>(startKeys);
390
391 ImmutableBytesWritable first = sorted.first();
392 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
393 throw new IllegalArgumentException(
394 "First region of table should have empty start key. Instead has: "
395 + Bytes.toStringBinary(first.get()));
396 }
397 sorted.remove(first);
398
399
400 FileSystem fs = partitionsPath.getFileSystem(conf);
401 SequenceFile.Writer writer = SequenceFile.createWriter(
402 fs, conf, partitionsPath, ImmutableBytesWritable.class,
403 NullWritable.class);
404
405 try {
406 for (ImmutableBytesWritable startKey : sorted) {
407 writer.append(startKey, NullWritable.get());
408 }
409 } finally {
410 writer.close();
411 }
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 @Deprecated
431 public static void configureIncrementalLoad(Job job, HTable table)
432 throws IOException {
433 configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
434 }
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450 public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
451 throws IOException {
452 configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
453 }
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469 public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
470 RegionLocator regionLocator) throws IOException {
471 configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class);
472 }
473
474 static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
475 RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
476 UnsupportedEncodingException {
477 Configuration conf = job.getConfiguration();
478 job.setOutputKeyClass(ImmutableBytesWritable.class);
479 job.setOutputValueClass(KeyValue.class);
480 job.setOutputFormatClass(cls);
481
482
483
484
485 if (KeyValue.class.equals(job.getMapOutputValueClass())) {
486 job.setReducerClass(KeyValueSortReducer.class);
487 } else if (Put.class.equals(job.getMapOutputValueClass())) {
488 job.setReducerClass(PutSortReducer.class);
489 } else if (Text.class.equals(job.getMapOutputValueClass())) {
490 job.setReducerClass(TextSortReducer.class);
491 } else {
492 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
493 }
494
495 conf.setStrings("io.serializations", conf.get("io.serializations"),
496 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
497 KeyValueSerialization.class.getName());
498
499 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
500
501 LOG.info("bulkload locality sensitive enabled");
502 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
503 }
504
505
506 LOG.info("Looking up current regions for table " + regionLocator.getName());
507 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
508 LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
509 "to match current region count");
510 job.setNumReduceTasks(startKeys.size());
511
512 configurePartitioner(job, startKeys);
513
514 configureCompression(conf, tableDescriptor);
515 configureBloomType(tableDescriptor, conf);
516 configureBlockSize(tableDescriptor, conf);
517 configureDataBlockEncoding(tableDescriptor, conf);
518
519 TableMapReduceUtil.addDependencyJars(job);
520 TableMapReduceUtil.initCredentials(job);
521 LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
522 }
523
524 public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
525 IOException {
526 Configuration conf = job.getConfiguration();
527
528 job.setOutputKeyClass(ImmutableBytesWritable.class);
529 job.setOutputValueClass(KeyValue.class);
530 job.setOutputFormatClass(HFileOutputFormat2.class);
531
532
533 configureCompression(conf, tableDescriptor);
534 configureBloomType(tableDescriptor, conf);
535 configureBlockSize(tableDescriptor, conf);
536 configureDataBlockEncoding(tableDescriptor, conf);
537
538 TableMapReduceUtil.addDependencyJars(job);
539 TableMapReduceUtil.initCredentials(job);
540 LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
541 }
542
543
544
545
546
547
548
549
550 @VisibleForTesting
551 static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
552 conf) {
553 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
554 COMPRESSION_FAMILIES_CONF_KEY);
555 Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
556 Algorithm>(Bytes.BYTES_COMPARATOR);
557 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
558 Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
559 compressionMap.put(e.getKey(), algorithm);
560 }
561 return compressionMap;
562 }
563
564
565
566
567
568
569
570
571 @VisibleForTesting
572 static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
573 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
574 BLOOM_TYPE_FAMILIES_CONF_KEY);
575 Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
576 BloomType>(Bytes.BYTES_COMPARATOR);
577 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
578 BloomType bloomType = BloomType.valueOf(e.getValue());
579 bloomTypeMap.put(e.getKey(), bloomType);
580 }
581 return bloomTypeMap;
582 }
583
584
585
586
587
588
589
590
591 @VisibleForTesting
592 static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
593 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
594 BLOCK_SIZE_FAMILIES_CONF_KEY);
595 Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
596 Integer>(Bytes.BYTES_COMPARATOR);
597 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
598 Integer blockSize = Integer.parseInt(e.getValue());
599 blockSizeMap.put(e.getKey(), blockSize);
600 }
601 return blockSizeMap;
602 }
603
604
605
606
607
608
609
610
611
612 @VisibleForTesting
613 static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
614 Configuration conf) {
615 Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
616 DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
617 Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
618 DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
619 for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
620 encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
621 }
622 return encoderMap;
623 }
624
625
626
627
628
629
630
631
632
633 private static Map<byte[], String> createFamilyConfValueMap(
634 Configuration conf, String confName) {
635 Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
636 String confVal = conf.get(confName, "");
637 for (String familyConf : confVal.split("&")) {
638 String[] familySplit = familyConf.split("=");
639 if (familySplit.length != 2) {
640 continue;
641 }
642 try {
643 confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
644 URLDecoder.decode(familySplit[1], "UTF-8"));
645 } catch (UnsupportedEncodingException e) {
646
647 throw new AssertionError(e);
648 }
649 }
650 return confValMap;
651 }
652
653
654
655
656
657 static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
658 throws IOException {
659 Configuration conf = job.getConfiguration();
660
661 FileSystem fs = FileSystem.get(conf);
662 Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + UUID.randomUUID());
663 fs.makeQualified(partitionsPath);
664 writePartitions(conf, partitionsPath, splitPoints);
665 fs.deleteOnExit(partitionsPath);
666
667
668 job.setPartitionerClass(TotalOrderPartitioner.class);
669 TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
670 }
671
672
673
674
675
676
677
678
679
680
681 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
682 value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
683 @VisibleForTesting
684 static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
685 throws UnsupportedEncodingException {
686 StringBuilder compressionConfigValue = new StringBuilder();
687 if(tableDescriptor == null){
688
689 return;
690 }
691 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
692 int i = 0;
693 for (HColumnDescriptor familyDescriptor : families) {
694 if (i++ > 0) {
695 compressionConfigValue.append('&');
696 }
697 compressionConfigValue.append(URLEncoder.encode(
698 familyDescriptor.getNameAsString(), "UTF-8"));
699 compressionConfigValue.append('=');
700 compressionConfigValue.append(URLEncoder.encode(
701 familyDescriptor.getCompressionType().getName(), "UTF-8"));
702 }
703
704 conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
705 }
706
707
708
709
710
711
712
713
714
715
716 @VisibleForTesting
717 static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
718 throws UnsupportedEncodingException {
719 StringBuilder blockSizeConfigValue = new StringBuilder();
720 if (tableDescriptor == null) {
721
722 return;
723 }
724 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
725 int i = 0;
726 for (HColumnDescriptor familyDescriptor : families) {
727 if (i++ > 0) {
728 blockSizeConfigValue.append('&');
729 }
730 blockSizeConfigValue.append(URLEncoder.encode(
731 familyDescriptor.getNameAsString(), "UTF-8"));
732 blockSizeConfigValue.append('=');
733 blockSizeConfigValue.append(URLEncoder.encode(
734 String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
735 }
736
737 conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
738 }
739
740
741
742
743
744
745
746
747
748
749 @VisibleForTesting
750 static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
751 throws UnsupportedEncodingException {
752 if (tableDescriptor == null) {
753
754 return;
755 }
756 StringBuilder bloomTypeConfigValue = new StringBuilder();
757 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
758 int i = 0;
759 for (HColumnDescriptor familyDescriptor : families) {
760 if (i++ > 0) {
761 bloomTypeConfigValue.append('&');
762 }
763 bloomTypeConfigValue.append(URLEncoder.encode(
764 familyDescriptor.getNameAsString(), "UTF-8"));
765 bloomTypeConfigValue.append('=');
766 String bloomType = familyDescriptor.getBloomFilterType().toString();
767 if (bloomType == null) {
768 bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
769 }
770 bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
771 }
772 conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
773 }
774
775
776
777
778
779
780
781
782
783
784 @VisibleForTesting
785 static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
786 Configuration conf) throws UnsupportedEncodingException {
787 if (tableDescriptor == null) {
788
789 return;
790 }
791 StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
792 Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
793 int i = 0;
794 for (HColumnDescriptor familyDescriptor : families) {
795 if (i++ > 0) {
796 dataBlockEncodingConfigValue.append('&');
797 }
798 dataBlockEncodingConfigValue.append(
799 URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
800 dataBlockEncodingConfigValue.append('=');
801 DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
802 if (encoding == null) {
803 encoding = DataBlockEncoding.NONE;
804 }
805 dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
806 "UTF-8"));
807 }
808 conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
809 dataBlockEncodingConfigValue.toString());
810 }
811 }