Coverage Report - org.apache.giraph.graph.partition.PartitionBalancer
 
Classes in this File Line Coverage Branch Coverage Complexity
PartitionBalancer
79%
35/44
66%
18/27
3.667
PartitionBalancer$1
100%
1/1
N/A
3.667
PartitionBalancer$BalanceValue
100%
4/4
N/A
3.667
PartitionBalancer$PartitionOwnerComparator
66%
4/6
N/A
3.667
PartitionBalancer$WorkerInfoAssignments
66%
10/15
50%
1/2
3.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph.partition;
 20  
 
 21  
 import java.util.ArrayList;
 22  
 import java.util.Collection;
 23  
 import java.util.Collections;
 24  
 import java.util.Comparator;
 25  
 import java.util.HashMap;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 import java.util.PriorityQueue;
 29  
 
 30  
 import org.apache.giraph.graph.WorkerInfo;
 31  
 import org.apache.hadoop.conf.Configuration;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 /**
 35  
  * Helper class for balancing partitions across a set of workers.
 36  
  */
 37  8
 public class PartitionBalancer {
 38  
   /** Partition balancing algorithm */
 39  
   public static final String PARTITION_BALANCE_ALGORITHM =
 40  
     "hash.partitionBalanceAlgorithm";
 41  
   /** No rebalancing during the supersteps */
 42  
   public static final String STATIC_BALANCE_ALGORITHM =
 43  
     "static";
 44  
   /** Rebalance across supersteps by edges */
 45  
   public static final String EGDE_BALANCE_ALGORITHM =
 46  
     "edges";
 47  
   /** Rebalance across supersteps by vertices */
 48  
   public static final String VERTICES_BALANCE_ALGORITHM =
 49  
     "vertices";
 50  
   /** Class logger */
 51  1
   private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
 52  
 
 53  
   /**
 54  
    * What value to balance partitions with?  Edges, vertices?
 55  
    */
 56  5
   private enum BalanceValue {
 57  
     /** Not chosen */
 58  1
     UNSET,
 59  
     /** Balance with edges */
 60  1
     EDGES,
 61  
     /** Balance with vertices */
 62  1
     VERTICES
 63  
   }
 64  
 
 65  
   /**
 66  
    * Do not construct this class.
 67  
    */
 68  0
   private PartitionBalancer() { }
 69  
 
 70  
   /**
 71  
    * Get the value used to balance.
 72  
    *
 73  
    * @param partitionStat Stats of this partition.
 74  
    * @param balanceValue Type of the value to balance.
 75  
    * @return Balance value.
 76  
    */
 77  
   private static long getBalanceValue(PartitionStats partitionStat,
 78  
       BalanceValue balanceValue) {
 79  1
     switch (balanceValue) {
 80  
     case EDGES:
 81  0
       return partitionStat.getEdgeCount();
 82  
     case VERTICES:
 83  8
       return partitionStat.getVertexCount();
 84  
     default:
 85  0
       throw new IllegalArgumentException(
 86  
           "getBalanceValue: Illegal balance value " + balanceValue);
 87  
     }
 88  
   }
 89  
 
 90  
   /**
 91  
    * Used to sort the partition owners from lowest value to highest value
 92  
    */
 93  0
   private static class PartitionOwnerComparator implements
 94  
       Comparator<PartitionOwner> {
 95  
     /** Map of owner to stats */
 96  
     private final Map<PartitionOwner, PartitionStats> ownerStatMap;
 97  
     /** Value type to compare on */
 98  
     private final BalanceValue balanceValue;
 99  
 
 100  
 
 101  
     /**
 102  
      * Only constructor.
 103  
      *
 104  
      * @param ownerStatMap Map of owners to stats.
 105  
      * @param balanceValue Value to balance with.
 106  
      */
 107  
     public PartitionOwnerComparator(
 108  
         Map<PartitionOwner, PartitionStats> ownerStatMap,
 109  8
         BalanceValue balanceValue) {
 110  8
       this.ownerStatMap = ownerStatMap;
 111  8
       this.balanceValue = balanceValue;
 112  8
     }
 113  
 
 114  
     @Override
 115  
     public int compare(PartitionOwner owner1, PartitionOwner owner2) {
 116  0
       return (int)
 117  
           (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
 118  
               getBalanceValue(ownerStatMap.get(owner2), balanceValue));
 119  
     }
 120  
   }
 121  
 
 122  
   /**
 123  
    * Structure to keep track of how much value a {@link WorkerInfo} has
 124  
    * been assigned.
 125  
    */
 126  0
   private static class WorkerInfoAssignments implements
 127  
       Comparable<WorkerInfoAssignments> {
 128  
     /** Worker info associated */
 129  
     private final WorkerInfo workerInfo;
 130  
     /** Balance value */
 131  
     private final BalanceValue balanceValue;
 132  
     /** Map of owner to stats */
 133  
     private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
 134  
     /** Current value of this object */
 135  8
     private long value = 0;
 136  
 
 137  
     /**
 138  
      * Constructor with final values.
 139  
      *
 140  
      * @param workerInfo Worker info for assignment.
 141  
      * @param balanceValue Value used to balance.
 142  
      * @param ownerStatsMap Map of owner to stats.
 143  
      */
 144  
     public WorkerInfoAssignments(
 145  
         WorkerInfo workerInfo,
 146  
         BalanceValue balanceValue,
 147  8
         Map<PartitionOwner, PartitionStats> ownerStatsMap) {
 148  8
       this.workerInfo = workerInfo;
 149  8
       this.balanceValue = balanceValue;
 150  8
       this.ownerStatsMap = ownerStatsMap;
 151  8
     }
 152  
 
 153  
     /**
 154  
      * Get the total value of all partitions assigned to this worker.
 155  
      *
 156  
      * @return Total value of all partition assignments.
 157  
      */
 158  
     public long getValue() {
 159  0
       return value;
 160  
     }
 161  
 
 162  
     /**
 163  
      * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
 164  
      *
 165  
      * @param partitionOwner PartitionOwner to assign.
 166  
      */
 167  
     public void assignPartitionOwner(
 168  
         PartitionOwner partitionOwner) {
 169  8
       value += getBalanceValue(ownerStatsMap.get(partitionOwner),
 170  
           balanceValue);
 171  8
       if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
 172  0
         partitionOwner.setPreviousWorkerInfo(
 173  
             partitionOwner.getWorkerInfo());
 174  0
         partitionOwner.setWorkerInfo(workerInfo);
 175  
       } else {
 176  8
         partitionOwner.setPreviousWorkerInfo(null);
 177  
       }
 178  8
     }
 179  
 
 180  
     @Override
 181  
     public int compareTo(WorkerInfoAssignments other) {
 182  0
       return (int)
 183  
           (getValue() - ((WorkerInfoAssignments) other).getValue());
 184  
     }
 185  
   }
 186  
 
 187  
   /**
 188  
    * Balance the partitions with an algorithm based on a value.
 189  
    *
 190  
    * @param conf Configuration to find the algorithm
 191  
    * @param partitionOwners All the owners of all partitions
 192  
    * @param allPartitionStats All the partition stats
 193  
    * @param availableWorkerInfos All the available workers
 194  
    * @return Balanced partition owners
 195  
    */
 196  
   public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
 197  
       Configuration conf,
 198  
       Collection<PartitionOwner> partitionOwners,
 199  
       Collection<PartitionStats> allPartitionStats,
 200  
       Collection<WorkerInfo> availableWorkerInfos) {
 201  
 
 202  178
     String balanceAlgorithm =
 203  
         conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
 204  178
     if (LOG.isInfoEnabled()) {
 205  178
       LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
 206  
           balanceAlgorithm);
 207  
     }
 208  178
     BalanceValue balanceValue = BalanceValue.UNSET;
 209  178
     if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
 210  170
       return partitionOwners;
 211  8
     } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
 212  0
       balanceValue = BalanceValue.EDGES;
 213  8
     } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
 214  8
       balanceValue = BalanceValue.VERTICES;
 215  
     } else {
 216  0
       throw new IllegalArgumentException(
 217  
           "balancePartitionsAcrossWorkers: Illegal balance " +
 218  
               "algorithm - " + balanceAlgorithm);
 219  
     }
 220  
 
 221  
     // Join the partition stats and partition owners by partition id
 222  8
     Map<Integer, PartitionStats> idStatMap =
 223  
         new HashMap<Integer, PartitionStats>();
 224  8
     for (PartitionStats partitionStats : allPartitionStats) {
 225  8
       if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
 226  
           null) {
 227  0
         throw new IllegalStateException(
 228  
             "balancePartitionsAcrossWorkers: Duplicate partition id " +
 229  
                 "for " + partitionStats);
 230  
       }
 231  
     }
 232  8
     Map<PartitionOwner, PartitionStats> ownerStatsMap =
 233  
         new HashMap<PartitionOwner, PartitionStats>();
 234  8
     for (PartitionOwner partitionOwner : partitionOwners) {
 235  8
       PartitionStats partitionStats =
 236  
           idStatMap.get(partitionOwner.getPartitionId());
 237  8
       if (partitionStats == null) {
 238  0
         throw new IllegalStateException(
 239  
             "balancePartitionsAcrossWorkers: Missing partition " +
 240  
                 "stats for " + partitionOwner);
 241  
       }
 242  8
       if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
 243  0
         throw new IllegalStateException(
 244  
             "balancePartitionsAcrossWorkers: Duplicate partition " +
 245  
                 "owner " + partitionOwner);
 246  
       }
 247  8
     }
 248  8
     if (ownerStatsMap.size() != partitionOwners.size()) {
 249  0
       throw new IllegalStateException(
 250  
           "balancePartitionsAcrossWorkers: ownerStats count = " +
 251  
               ownerStatsMap.size() + ", partitionOwners count = " +
 252  
               partitionOwners.size() + " and should match.");
 253  
     }
 254  
 
 255  8
     List<WorkerInfoAssignments> workerInfoAssignmentsList =
 256  
         new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
 257  8
     for (WorkerInfo workerInfo : availableWorkerInfos) {
 258  8
       workerInfoAssignmentsList.add(
 259  
           new WorkerInfoAssignments(
 260  
               workerInfo, balanceValue, ownerStatsMap));
 261  
     }
 262  
 
 263  
     // A simple heuristic for balancing the partitions across the workers
 264  
     // using a value (edges, vertices).  An improvement would be to
 265  
     // take into account the already existing partition worker assignments.
 266  
     // 1.  Sort the partitions by size
 267  
     // 2.  Place the workers in a min heap sorted by their total balance
 268  
     //     value.
 269  
     // 3.  From largest partition to the smallest, take the partition
 270  
     //     worker at the top of the heap, add the partition to it, and
 271  
     //     then put it back in the heap
 272  8
     List<PartitionOwner> partitionOwnerList =
 273  
         new ArrayList<PartitionOwner>(partitionOwners);
 274  8
     Collections.sort(partitionOwnerList,
 275  
         Collections.reverseOrder(
 276  
             new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
 277  8
     PriorityQueue<WorkerInfoAssignments> minQueue =
 278  
         new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
 279  8
     for (PartitionOwner partitionOwner : partitionOwnerList) {
 280  8
       WorkerInfoAssignments chosenWorker = minQueue.remove();
 281  8
       chosenWorker.assignPartitionOwner(partitionOwner);
 282  8
       minQueue.add(chosenWorker);
 283  8
     }
 284  
 
 285  8
     return partitionOwnerList;
 286  
   }
 287  
 }
 288