Coverage Report - org.apache.giraph.graph.partition.PartitionUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
PartitionUtils
86%
20/23
70%
7/10
2.5
PartitionUtils$1
N/A
N/A
2.5
PartitionUtils$EdgeCountComparator
50%
1/2
N/A
2.5
PartitionUtils$VertexCountComparator
50%
1/2
N/A
2.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph.partition;
 20  
 
 21  
 import java.util.Collection;
 22  
 import java.util.Collections;
 23  
 import java.util.Comparator;
 24  
 import java.util.HashMap;
 25  
 import java.util.List;
 26  
 import java.util.Map;
 27  
 import java.util.Map.Entry;
 28  
 
 29  
 import org.apache.giraph.graph.VertexEdgeCount;
 30  
 import org.apache.giraph.graph.WorkerInfo;
 31  
 import org.apache.log4j.Logger;
 32  
 
 33  
 import com.google.common.collect.Lists;
 34  
 import com.google.common.collect.Maps;
 35  
 
 36  
 /**
 37  
  * Helper class for {@link Partition} related operations.
 38  
  */
 39  
 public class PartitionUtils {
 40  
   /** Class logger */
 41  1
   private static Logger LOG = Logger.getLogger(PartitionUtils.class);
 42  
 
 43  
   /**
 44  
    * Do not construct this object.
 45  
    */
 46  0
   private PartitionUtils() { }
 47  
 
 48  
   /**
 49  
    * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
 50  
    */
 51  388
   private static class EdgeCountComparator implements
 52  
       Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
 53  
     @Override
 54  
     public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
 55  
         Entry<WorkerInfo, VertexEdgeCount> worker2) {
 56  0
       return (int) (worker1.getValue().getEdgeCount() -
 57  
         worker2.getValue().getEdgeCount());
 58  
     }
 59  
   }
 60  
 
 61  
   /**
 62  
    * Compare vertex counts between a {@link WorkerInfo} and
 63  
    * {@link VertexEdgeCount}.
 64  
    */
 65  388
   private static class VertexCountComparator implements
 66  
       Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
 67  
     @Override
 68  
     public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
 69  
         Entry<WorkerInfo, VertexEdgeCount> worker2) {
 70  0
       return (int) (worker1.getValue().getVertexCount() -
 71  
         worker2.getValue().getVertexCount());
 72  
     }
 73  
   }
 74  
 
 75  
   /**
 76  
    * Check for imbalances on a per worker basis, by calculating the
 77  
    * mean, high and low workers by edges and vertices.
 78  
    *
 79  
    * @param partitionOwnerList List of partition owners.
 80  
    * @param allPartitionStats All the partition stats.
 81  
    */
 82  
   public static void analyzePartitionStats(
 83  
       Collection<PartitionOwner> partitionOwnerList,
 84  
       List<PartitionStats> allPartitionStats) {
 85  194
     Map<Integer, PartitionOwner> idOwnerMap =
 86  
         new HashMap<Integer, PartitionOwner>();
 87  194
     for (PartitionOwner partitionOwner : partitionOwnerList) {
 88  194
       if (idOwnerMap.put(partitionOwner.getPartitionId(),
 89  
           partitionOwner) != null) {
 90  0
         throw new IllegalStateException(
 91  
             "analyzePartitionStats: Duplicate partition " +
 92  
                 partitionOwner);
 93  
       }
 94  
     }
 95  
 
 96  194
     Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
 97  194
     VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
 98  194
     for (PartitionStats partitionStats : allPartitionStats) {
 99  194
       WorkerInfo workerInfo =
 100  
           idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
 101  194
       VertexEdgeCount vertexEdgeCount =
 102  
           workerStatsMap.get(workerInfo);
 103  194
       if (vertexEdgeCount == null) {
 104  194
         workerStatsMap.put(
 105  
             workerInfo,
 106  
             new VertexEdgeCount(partitionStats.getVertexCount(),
 107  
                 partitionStats.getEdgeCount()));
 108  
       } else {
 109  0
         workerStatsMap.put(
 110  
             workerInfo,
 111  
             vertexEdgeCount.incrVertexEdgeCount(
 112  
                 partitionStats.getVertexCount(),
 113  
                 partitionStats.getEdgeCount()));
 114  
       }
 115  194
       totalVertexEdgeCount =
 116  
           totalVertexEdgeCount.incrVertexEdgeCount(
 117  
               partitionStats.getVertexCount(),
 118  
               partitionStats.getEdgeCount());
 119  194
     }
 120  
 
 121  194
     List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
 122  
         Lists.newArrayList(workerStatsMap.entrySet());
 123  
 
 124  194
     if (LOG.isInfoEnabled()) {
 125  194
       Collections.sort(workerEntryList, new VertexCountComparator());
 126  194
       LOG.info("analyzePartitionStats: Vertices - Mean: " +
 127  
           (totalVertexEdgeCount.getVertexCount() /
 128  
               workerStatsMap.size()) +
 129  
               ", Min: " +
 130  
               workerEntryList.get(0).getKey() + " - " +
 131  
               workerEntryList.get(0).getValue().getVertexCount() +
 132  
               ", Max: " +
 133  
               workerEntryList.get(workerEntryList.size() - 1).getKey() +
 134  
               " - " +
 135  
               workerEntryList.get(workerEntryList.size() - 1).
 136  
               getValue().getVertexCount());
 137  194
       Collections.sort(workerEntryList, new EdgeCountComparator());
 138  194
       LOG.info("analyzePartitionStats: Edges - Mean: " +
 139  
           (totalVertexEdgeCount.getEdgeCount() /
 140  
               workerStatsMap.size()) +
 141  
               ", Min: " +
 142  
               workerEntryList.get(0).getKey() + " - " +
 143  
               workerEntryList.get(0).getValue().getEdgeCount() +
 144  
               ", Max: " +
 145  
               workerEntryList.get(workerEntryList.size() - 1).getKey() +
 146  
               " - " +
 147  
               workerEntryList.get(workerEntryList.size() - 1).
 148  
               getValue().getEdgeCount());
 149  
     }
 150  194
   }
 151  
 }