Coverage Report - org.apache.giraph.graph.partition.HashMasterPartitioner
 
Classes in this File Line Coverage Branch Coverage Complexity
HashMasterPartitioner
87%
28/32
58%
7/12
2.167
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph.partition;
 20  
 
 21  
 import java.util.ArrayList;
 22  
 import java.util.Collection;
 23  
 import java.util.Iterator;
 24  
 import java.util.List;
 25  
 
 26  
 import org.apache.giraph.graph.WorkerInfo;
 27  
 import org.apache.hadoop.conf.Configuration;
 28  
 import org.apache.hadoop.io.Writable;
 29  
 import org.apache.hadoop.io.WritableComparable;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 /**
 33  
  * Master will execute a hash based partitioning.
 34  
  *
 35  
  * @param <I> Vertex index value
 36  
  * @param <V> Vertex value
 37  
  * @param <E> Edge value
 38  
  * @param <M> Message value
 39  
  */
 40  
 @SuppressWarnings("rawtypes")
 41  
 public class HashMasterPartitioner<I extends WritableComparable,
 42  
     V extends Writable, E extends Writable, M extends Writable> implements
 43  
     MasterGraphPartitioner<I, V, E, M> {
 44  
   /** Multiplier for the current workers squared */
 45  
   public static final String PARTITION_COUNT_MULTIPLIER =
 46  
     "hash.masterPartitionCountMultipler";
 47  
   /** Default mulitplier for current workers squared */
 48  
   public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
 49  
   /** Overrides default partition count calculation if not -1 */
 50  
   public static final String USER_PARTITION_COUNT =
 51  
     "hash.userPartitionCount";
 52  
   /** Default user partition count */
 53  
   public static final int DEFAULT_USER_PARTITION_COUNT = -1;
 54  
   /** Class logger */
 55  1
   private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
 56  
   /**
 57  
    * ZooKeeper has a limit of the data in a single znode of 1 MB and
 58  
    * each entry can go be on the average somewhat more than 300 bytes
 59  
    */
 60  
   private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
 61  
   /** Provided configuration */
 62  
   private Configuration conf;
 63  
   /** Specified partition count (overrides calculation) */
 64  
   private final int userPartitionCount;
 65  
   /** Partition count (calculated in createInitialPartitionOwners) */
 66  24
   private int partitionCount = -1;
 67  
   /** Save the last generated partition owner list */
 68  
   private List<PartitionOwner> partitionOwnerList;
 69  
 
 70  
   /**
 71  
    * Constructor.
 72  
    *
 73  
    *@param conf Configuration used.
 74  
    */
 75  24
   public HashMasterPartitioner(Configuration conf) {
 76  24
     this.conf = conf;
 77  24
     userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
 78  
         DEFAULT_USER_PARTITION_COUNT);
 79  24
   }
 80  
 
 81  
   @Override
 82  
   public Collection<PartitionOwner> createInitialPartitionOwners(
 83  
       Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
 84  24
     if (availableWorkerInfos.isEmpty()) {
 85  0
       throw new IllegalArgumentException(
 86  
           "createInitialPartitionOwners: No available workers");
 87  
     }
 88  24
     List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
 89  24
     Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
 90  24
     if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
 91  24
       float multiplier = conf.getFloat(
 92  
           PARTITION_COUNT_MULTIPLIER,
 93  
           DEFAULT_PARTITION_COUNT_MULTIPLIER);
 94  24
       partitionCount =
 95  
           Math.max((int) (multiplier * availableWorkerInfos.size() *
 96  
               availableWorkerInfos.size()),
 97  
               1);
 98  24
     } else {
 99  0
       partitionCount = userPartitionCount;
 100  
     }
 101  24
     if (LOG.isInfoEnabled()) {
 102  24
       LOG.info("createInitialPartitionOwners: Creating " +
 103  
         partitionCount + ", default would have been " +
 104  
         (availableWorkerInfos.size() *
 105  
          availableWorkerInfos.size()) + " partitions.");
 106  
     }
 107  24
     if (partitionCount > MAX_PARTTIONS) {
 108  0
       LOG.warn("createInitialPartitionOwners: " +
 109  
           "Reducing the partitionCount to " + MAX_PARTTIONS +
 110  
           " from " + partitionCount);
 111  0
       partitionCount = MAX_PARTTIONS;
 112  
     }
 113  
 
 114  48
     for (int i = 0; i < partitionCount; ++i) {
 115  24
       PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
 116  24
       if (!workerIt.hasNext()) {
 117  24
         workerIt = availableWorkerInfos.iterator();
 118  
       }
 119  24
       ownerList.add(owner);
 120  
     }
 121  24
     this.partitionOwnerList = ownerList;
 122  24
     return ownerList;
 123  
   }
 124  
 
 125  
   @Override
 126  
   public Collection<PartitionOwner> getCurrentPartitionOwners() {
 127  36
     return partitionOwnerList;
 128  
   }
 129  
 
 130  
   /**
 131  
    * Subclasses can set the partition owner list.
 132  
    *
 133  
    * @param partitionOwnerList New partition owner list.
 134  
    */
 135  
   protected void setPartitionOwnerList(List<PartitionOwner>
 136  
   partitionOwnerList) {
 137  16
     this.partitionOwnerList = partitionOwnerList;
 138  16
   }
 139  
 
 140  
   @Override
 141  
   public Collection<PartitionOwner> generateChangedPartitionOwners(
 142  
       Collection<PartitionStats> allPartitionStatsList,
 143  
       Collection<WorkerInfo> availableWorkerInfos,
 144  
       int maxWorkers,
 145  
       long superstep) {
 146  178
     return PartitionBalancer.balancePartitionsAcrossWorkers(
 147  
         conf,
 148  
         partitionOwnerList,
 149  
         allPartitionStatsList,
 150  
         availableWorkerInfos);
 151  
   }
 152  
 
 153  
   @Override
 154  
   public PartitionStats createPartitionStats() {
 155  218
     return new PartitionStats();
 156  
   }
 157  
 }