View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.mapreduce.lib.partition;
18  
19  import java.io.BufferedReader;
20  import java.io.FileNotFoundException;
21  import java.io.FileReader;
22  import java.io.IOException;
23  import java.net.URI;
24  import java.util.Arrays;
25  import java.util.Scanner;
26  import java.util.TreeSet;
27  
28  import org.apache.commons.codec.binary.Base64;
29  import org.apache.hadoop.conf.Configurable;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.filecache.DistributedCache;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.io.Text;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.mapreduce.Job;
36  import org.apache.hadoop.mapreduce.Partitioner;
37  
38  /**
39   * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing.
40   */
41  public class RangePartitioner extends Partitioner<Text,Writable> implements Configurable {
42    private static final String PREFIX = RangePartitioner.class.getName();
43    private static final String CUTFILE_KEY = PREFIX + ".cutFile";
44    private static final String NUM_SUBBINS = PREFIX + ".subBins";
45    
46    private Configuration conf;
47    
48    @Override
49    public int getPartition(Text key, Writable value, int numPartitions) {
50      try {
51        return findPartition(key, getCutPoints(), getNumSubBins());
52      } catch (IOException e) {
53        throw new RuntimeException(e);
54      }
55    }
56    
57    int findPartition(Text key, Text[] array, int numSubBins) {
58      // find the bin for the range, and guarantee it is positive
59      int index = Arrays.binarySearch(array, key);
60      index = index < 0 ? (index + 1) * -1 : index;
61      
62      // both conditions work with numSubBins == 1, but this check is to avoid
63      // hashing, when we don't need to, for speed
64      if (numSubBins < 2)
65        return index;
66      return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins;
67    }
68    
69    private int _numSubBins = 0;
70    
71    private synchronized int getNumSubBins() {
72      if (_numSubBins < 1) {
73        // get number of sub-bins and guarantee it is positive
74        _numSubBins = Math.max(1, getConf().getInt(NUM_SUBBINS, 1));
75      }
76      return _numSubBins;
77    }
78    
79    private Text cutPointArray[] = null;
80    
81    private synchronized Text[] getCutPoints() throws IOException {
82      if (cutPointArray == null) {
83        String cutFileName = conf.get(CUTFILE_KEY);
84        Path[] cf = DistributedCache.getLocalCacheFiles(conf);
85        
86        if (cf != null) {
87          for (Path path : cf) {
88            if (path.toUri().getPath().endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) {
89              TreeSet<Text> cutPoints = new TreeSet<Text>();
90              Scanner in = new Scanner(new BufferedReader(new FileReader(path.toString())));
91              try {
92                while (in.hasNextLine())
93                  cutPoints.add(new Text(Base64.decodeBase64(in.nextLine().getBytes())));
94              } finally {
95                in.close();
96              }
97              cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
98              break;
99            }
100         }
101       }
102       if (cutPointArray == null)
103         throw new FileNotFoundException(cutFileName + " not found in distributed cache");
104     }
105     return cutPointArray;
106   }
107   
108   @Override
109   public Configuration getConf() {
110     return conf;
111   }
112   
113   @Override
114   public void setConf(Configuration conf) {
115     this.conf = conf;
116   }
117   
118   /**
119    * Sets the hdfs file name to use, containing a newline separated list of Base64 encoded split points that represent ranges for partitioning
120    */
121   public static void setSplitFile(Job job, String file) {
122     URI uri = new Path(file).toUri();
123     DistributedCache.addCacheFile(uri, job.getConfiguration());
124     job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
125   }
126   
127   /**
128    * Sets the number of random sub-bins per range
129    */
130   public static void setNumSubBins(Job job, int num) {
131     job.getConfiguration().setInt(NUM_SUBBINS, num);
132   }
133 }