1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.mapreduce.lib.partition;
18
19 import java.io.BufferedReader;
20 import java.io.FileNotFoundException;
21 import java.io.FileReader;
22 import java.io.IOException;
23 import java.net.URI;
24 import java.util.Arrays;
25 import java.util.Scanner;
26 import java.util.TreeSet;
27
28 import org.apache.commons.codec.binary.Base64;
29 import org.apache.hadoop.conf.Configurable;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.filecache.DistributedCache;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.Text;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.mapreduce.Job;
36 import org.apache.hadoop.mapreduce.Partitioner;
37
38 /**
39 * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing.
40 */
41 public class RangePartitioner extends Partitioner<Text,Writable> implements Configurable {
42 private static final String PREFIX = RangePartitioner.class.getName();
43 private static final String CUTFILE_KEY = PREFIX + ".cutFile";
44 private static final String NUM_SUBBINS = PREFIX + ".subBins";
45
46 private Configuration conf;
47
48 @Override
49 public int getPartition(Text key, Writable value, int numPartitions) {
50 try {
51 return findPartition(key, getCutPoints(), getNumSubBins());
52 } catch (IOException e) {
53 throw new RuntimeException(e);
54 }
55 }
56
57 int findPartition(Text key, Text[] array, int numSubBins) {
58
59 int index = Arrays.binarySearch(array, key);
60 index = index < 0 ? (index + 1) * -1 : index;
61
62
63
64 if (numSubBins < 2)
65 return index;
66 return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins;
67 }
68
69 private int _numSubBins = 0;
70
71 private synchronized int getNumSubBins() {
72 if (_numSubBins < 1) {
73
74 _numSubBins = Math.max(1, getConf().getInt(NUM_SUBBINS, 1));
75 }
76 return _numSubBins;
77 }
78
79 private Text cutPointArray[] = null;
80
81 private synchronized Text[] getCutPoints() throws IOException {
82 if (cutPointArray == null) {
83 String cutFileName = conf.get(CUTFILE_KEY);
84 Path[] cf = DistributedCache.getLocalCacheFiles(conf);
85
86 if (cf != null) {
87 for (Path path : cf) {
88 if (path.toUri().getPath().endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) {
89 TreeSet<Text> cutPoints = new TreeSet<Text>();
90 Scanner in = new Scanner(new BufferedReader(new FileReader(path.toString())));
91 try {
92 while (in.hasNextLine())
93 cutPoints.add(new Text(Base64.decodeBase64(in.nextLine().getBytes())));
94 } finally {
95 in.close();
96 }
97 cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
98 break;
99 }
100 }
101 }
102 if (cutPointArray == null)
103 throw new FileNotFoundException(cutFileName + " not found in distributed cache");
104 }
105 return cutPointArray;
106 }
107
108 @Override
109 public Configuration getConf() {
110 return conf;
111 }
112
113 @Override
114 public void setConf(Configuration conf) {
115 this.conf = conf;
116 }
117
118 /**
119 * Sets the hdfs file name to use, containing a newline separated list of Base64 encoded split points that represent ranges for partitioning
120 */
121 public static void setSplitFile(Job job, String file) {
122 URI uri = new Path(file).toUri();
123 DistributedCache.addCacheFile(uri, job.getConfiguration());
124 job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
125 }
126
127 /**
128 * Sets the number of random sub-bins per range
129 */
130 public static void setNumSubBins(Job job, int num) {
131 job.getConfiguration().setInt(NUM_SUBBINS, num);
132 }
133 }