Coverage Report - org.apache.giraph.comm.SendMutationsCache
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMutationsCache
18%
7/37
0%
0/6
1.375
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import java.util.HashMap;
 22  
 import java.util.Map;
 23  
 
 24  
 import org.apache.giraph.graph.Vertex;
 25  
 import org.apache.giraph.graph.Edge;
 26  
 import org.apache.giraph.graph.VertexMutations;
 27  
 import org.apache.hadoop.io.Writable;
 28  
 import org.apache.hadoop.io.WritableComparable;
 29  
 
 30  
 /**
 31  
  * Aggregates the mutations to be sent to partitions so they can be sent in
 32  
  * bulk.
 33  
  *
 34  
  * @param <I> Vertex id
 35  
  * @param <V> Vertex data
 36  
  * @param <E> Edge data
 37  
  * @param <M> Message data
 38  
  */
 39  
 @SuppressWarnings("rawtypes")
 40  1
 public class SendMutationsCache<I extends WritableComparable,
 41  
     V extends Writable, E extends Writable, M extends Writable> {
 42  
   /** Internal cache */
 43  1
   private Map<Integer, Map<I, VertexMutations<I, V, E, M>>> mutationCache =
 44  
       new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
 45  
   /** Number of mutations in each partition */
 46  1
   private final Map<Integer, Integer> mutationCountMap =
 47  
       new HashMap<Integer, Integer>();
 48  
 
 49  
   /**
 50  
    * Get the mutations for a partition and destination vertex (creating if
 51  
    * it doesn't exist).
 52  
    *
 53  
    * @param partitionId Partition id
 54  
    * @param destVertexId Destination vertex id
 55  
    * @return Mutations for the vertex
 56  
    */
 57  
   private VertexMutations<I, V, E, M> getVertexMutations(
 58  
       Integer partitionId, I destVertexId) {
 59  0
     Map<I, VertexMutations<I, V, E, M>> idMutations =
 60  
         mutationCache.get(partitionId);
 61  0
     if (idMutations == null) {
 62  0
       idMutations = new HashMap<I, VertexMutations<I, V, E, M>>();
 63  0
       mutationCache.put(partitionId, idMutations);
 64  
     }
 65  0
     VertexMutations<I, V, E, M> mutations = idMutations.get(destVertexId);
 66  0
     if (mutations == null) {
 67  0
       mutations = new VertexMutations<I, V, E, M>();
 68  0
       idMutations.put(destVertexId, mutations);
 69  
     }
 70  0
     return mutations;
 71  
   }
 72  
 
 73  
   /**
 74  
    * Increment the number of mutations in a partition.
 75  
    *
 76  
    * @param partitionId Partition id
 77  
    * @return Number of mutations in a partition after the increment
 78  
    */
 79  
   private int incrementPartitionMutationCount(int partitionId) {
 80  0
     Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
 81  0
     if (currentPartitionMutationCount == null) {
 82  0
       currentPartitionMutationCount = 0;
 83  
     }
 84  0
     Integer updatedPartitionMutationCount =
 85  
         currentPartitionMutationCount + 1;
 86  0
     mutationCountMap.put(partitionId, updatedPartitionMutationCount);
 87  0
     return updatedPartitionMutationCount;
 88  
   }
 89  
 
 90  
   /**
 91  
    * Add an add edge mutation to the cache.
 92  
    *
 93  
    * @param partitionId Partition id
 94  
    * @param destVertexId Destination vertex id
 95  
    * @param edge Edge to be added
 96  
    * @return Number of mutations in the partition.
 97  
    */
 98  
   public int addEdgeMutation(
 99  
       Integer partitionId, I destVertexId, Edge<I, E> edge) {
 100  
     // Get the mutations for this partition
 101  0
     VertexMutations<I, V, E, M> mutations =
 102  
         getVertexMutations(partitionId, destVertexId);
 103  
 
 104  
     // Add the edge
 105  0
     mutations.addEdge(edge);
 106  
 
 107  
     // Update the number of mutations per partition
 108  0
     return incrementPartitionMutationCount(partitionId);
 109  
   }
 110  
 
 111  
   /**
 112  
    * Add a remove edge mutation to the cache.
 113  
    *
 114  
    * @param partitionId Partition id
 115  
    * @param vertexIndex Destination vertex id
 116  
    * @param destinationVertexIndex Edge vertex index to be removed
 117  
    * @return Number of mutations in the partition.
 118  
    */
 119  
   public int removeEdgeMutation(
 120  
       Integer partitionId, I vertexIndex, I destinationVertexIndex) {
 121  
     // Get the mutations for this partition
 122  0
     VertexMutations<I, V, E, M> mutations =
 123  
         getVertexMutations(partitionId, vertexIndex);
 124  
 
 125  
     // Remove the edge
 126  0
     mutations.removeEdge(destinationVertexIndex);
 127  
 
 128  
     // Update the number of mutations per partition
 129  0
     return incrementPartitionMutationCount(partitionId);
 130  
   }
 131  
 
 132  
   /**
 133  
    * Add a add vertex mutation to the cache.
 134  
    *
 135  
    * @param partitionId Partition id
 136  
    * @param vertex Vertex to be added
 137  
    * @return Number of mutations in the partition.
 138  
    */
 139  
   public int addVertexMutation(
 140  
       Integer partitionId, Vertex<I, V, E, M> vertex) {
 141  
     // Get the mutations for this partition
 142  0
     VertexMutations<I, V, E, M> mutations =
 143  
         getVertexMutations(partitionId, vertex.getId());
 144  
 
 145  
     // Add the vertex
 146  0
     mutations.addVertex(vertex);
 147  
 
 148  
     // Update the number of mutations per partition
 149  0
     return incrementPartitionMutationCount(partitionId);
 150  
   }
 151  
 
 152  
   /**
 153  
    * Add a remove vertex mutation to the cache.
 154  
    *
 155  
    * @param partitionId Partition id
 156  
    * @param destVertexId Vertex index to be removed
 157  
    * @return Number of mutations in the partition.
 158  
    */
 159  
   public int removeVertexMutation(
 160  
       Integer partitionId, I destVertexId) {
 161  
     // Get the mutations for this partition
 162  0
     VertexMutations<I, V, E, M> mutations =
 163  
         getVertexMutations(partitionId, destVertexId);
 164  
 
 165  
     // Remove the vertex
 166  0
     mutations.removeVertex();
 167  
 
 168  
     // Update the number of mutations per partition
 169  0
     return incrementPartitionMutationCount(partitionId);
 170  
   }
 171  
 
 172  
   /**
 173  
    * Gets the mutations for a partition and removes it from the cache.
 174  
    *
 175  
    * @param partitionId Partition id
 176  
    * @return Removed partition mutations
 177  
    */
 178  
   public Map<I, VertexMutations<I, V, E, M>> removePartitionMutations(
 179  
       int partitionId) {
 180  0
     Map<I, VertexMutations<I, V, E, M>> idMutations =
 181  
         mutationCache.remove(partitionId);
 182  0
     mutationCountMap.put(partitionId, 0);
 183  0
     return idMutations;
 184  
   }
 185  
 
 186  
   /**
 187  
    * Gets all the mutations and removes them from the cache.
 188  
    *
 189  
    * @return All vertex mutations for all partitions
 190  
    */
 191  
   public Map<Integer, Map<I, VertexMutations<I, V, E, M>>>
 192  
   removeAllPartitionMutations() {
 193  24
     Map<Integer, Map<I, VertexMutations<I, V, E, M>>> allMutations =
 194  
         mutationCache;
 195  24
     mutationCache =
 196  
         new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
 197  24
     mutationCountMap.clear();
 198  24
     return allMutations;
 199  
   }
 200  
 }