1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.hbase.client.Connection;
30 import org.apache.hadoop.hbase.client.ConnectionFactory;
31 import org.apache.hadoop.hbase.client.RegionLocator;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.Partitioner;
36
37
38
39
40
41
42
43
44
45
46 @InterfaceAudience.Public
47 @InterfaceStability.Stable
48 public class HRegionPartitioner<K2,V2>
49 implements Partitioner<ImmutableBytesWritable, V2> {
50 private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
51
52 private Connection connection;
53 private RegionLocator locator;
54 private byte[][] startKeys;
55
56 public void configure(JobConf job) {
57 try {
58 this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
59 TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE));
60 this.locator = this.connection.getRegionLocator(tableName);
61 } catch (IOException e) {
62 LOG.error(e);
63 }
64
65 try {
66 this.startKeys = this.locator.getStartKeys();
67 } catch (IOException e) {
68 LOG.error(e);
69 }
70 }
71
72 public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) {
73 byte[] region = null;
74
75 if (this.startKeys.length == 1){
76 return 0;
77 }
78 try {
79
80
81 region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
82 } catch (IOException e) {
83 LOG.error(e);
84 }
85 for (int i = 0; i < this.startKeys.length; i++){
86 if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
87 if (i >= numPartitions-1){
88
89 return (Integer.toString(i).hashCode()
90 & Integer.MAX_VALUE) % numPartitions;
91 }
92 return i;
93 }
94 }
95
96 return 0;
97 }
98 }