Coverage Report - org.apache.giraph.integration.SuperstepHashPartitionerFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
SuperstepHashPartitionerFactory
100%
3/3
N/A
2.667
SuperstepHashPartitionerFactory$SuperstepMasterPartition
100%
21/21
70%
7/10
2.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.integration;
 20  
 
 21  
 import java.util.ArrayList;
 22  
 import java.util.Collection;
 23  
 import java.util.List;
 24  
 
 25  
 import org.apache.giraph.graph.WorkerInfo;
 26  
 import org.apache.giraph.graph.partition.BasicPartitionOwner;
 27  
 import org.apache.giraph.graph.partition.HashMasterPartitioner;
 28  
 import org.apache.giraph.graph.partition.HashPartitionerFactory;
 29  
 import org.apache.giraph.graph.partition.MasterGraphPartitioner;
 30  
 import org.apache.giraph.graph.partition.PartitionOwner;
 31  
 import org.apache.giraph.graph.partition.PartitionStats;
 32  
 import org.apache.hadoop.conf.Configuration;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 import org.apache.hadoop.io.WritableComparable;
 35  
 import org.apache.log4j.Logger;
 36  
 
 37  
 /**
 38  
  * Example graph partitioner that builds on {@link HashMasterPartitioner} to
 39  
  * send the partitions to the worker that matches the superstep.  It is for
 40  
  * testing only and should never be used in practice.
 41  
  *
 42  
  * @param <I> Vertex id
 43  
  * @param <V> Vertex value
 44  
  * @param <E> Edge value
 45  
  * @param <M> Message data
 46  
  */
 47  
 @SuppressWarnings("rawtypes")
 48  4
 public class SuperstepHashPartitionerFactory<I extends WritableComparable,
 49  
     V extends Writable, E extends Writable, M extends Writable>
 50  
     extends HashPartitionerFactory<I, V, E, M> {
 51  
   /**
 52  
    * Changes the {@link HashMasterPartitioner} to make ownership of the
 53  
    * partitions based on a superstep.  For testing only as it is totally
 54  
    * unbalanced.
 55  
    *
 56  
    * @param <I> vertex id
 57  
    * @param <V> vertex data
 58  
    * @param <E> edge data
 59  
    * @param <M> message data
 60  
    */
 61  4
   private static class SuperstepMasterPartition<I extends WritableComparable,
 62  
       V extends Writable, E extends Writable, M extends Writable>
 63  
       extends HashMasterPartitioner<I, V, E, M> {
 64  
     /** Class logger */
 65  1
     private static Logger LOG =
 66  
         Logger.getLogger(SuperstepMasterPartition.class);
 67  
 
 68  
     /**
 69  
      * Construction with configuration.
 70  
      *
 71  
      * @param conf Configuration to be stored.
 72  
      */
 73  
     public SuperstepMasterPartition(Configuration conf) {
 74  2
       super(conf);
 75  2
     }
 76  
 
 77  
     @Override
 78  
     public Collection<PartitionOwner> generateChangedPartitionOwners(
 79  
         Collection<PartitionStats> allPartitionStatsList,
 80  
         Collection<WorkerInfo> availableWorkerInfos,
 81  
         int maxWorkers,
 82  
         long superstep) {
 83  
       // Assign all the partitions to
 84  
       // superstep mod availableWorkerInfos
 85  
       // Guaranteed to be different if the workers (and their order)
 86  
       // do not change
 87  16
       long workerIndex = superstep % availableWorkerInfos.size();
 88  16
       int i = 0;
 89  16
       WorkerInfo chosenWorkerInfo = null;
 90  16
       for (WorkerInfo workerInfo : availableWorkerInfos) {
 91  16
         if (workerIndex == i) {
 92  16
           chosenWorkerInfo = workerInfo;
 93  
         }
 94  16
         ++i;
 95  
       }
 96  16
       if (LOG.isInfoEnabled()) {
 97  16
         LOG.info("generateChangedPartitionOwners: Chosen worker " +
 98  
                  "for superstep " + superstep + " is " +
 99  
                  chosenWorkerInfo);
 100  
       }
 101  
 
 102  16
       List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
 103  
       for (PartitionOwner partitionOwner :
 104  16
         getCurrentPartitionOwners()) {
 105  16
         WorkerInfo prevWorkerinfo =
 106  
           partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
 107  
             null : partitionOwner.getWorkerInfo();
 108  16
         PartitionOwner tmpPartitionOwner =
 109  
           new BasicPartitionOwner(partitionOwner.getPartitionId(),
 110  
                                   chosenWorkerInfo,
 111  
                                   prevWorkerinfo,
 112  
                                   null);
 113  16
         partitionOwnerList.add(tmpPartitionOwner);
 114  16
         LOG.info("partition owner was " + partitionOwner +
 115  
             ", new " + tmpPartitionOwner);
 116  16
       }
 117  16
       setPartitionOwnerList(partitionOwnerList);
 118  16
       return partitionOwnerList;
 119  
     }
 120  
   }
 121  
 
 122  
   @Override
 123  
   public MasterGraphPartitioner<I, V, E, M>
 124  
   createMasterGraphPartitioner() {
 125  2
     return new SuperstepMasterPartition<I, V, E, M>(getConf());
 126  
   }
 127  
 }