Coverage Report - org.apache.giraph.graph.partition.Partition
 
Classes in this File Line Coverage Branch Coverage Complexity
Partition
70%
22/31
58%
7/12
1.636
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph.partition;
 20  
 
 21  
 import org.apache.giraph.graph.BspUtils;
 22  
 import org.apache.giraph.graph.GiraphJob;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.hadoop.conf.Configuration;
 25  
 import org.apache.hadoop.io.Writable;
 26  
 import org.apache.hadoop.io.WritableComparable;
 27  
 
 28  
 import com.google.common.collect.Maps;
 29  
 
 30  
 import java.io.DataInput;
 31  
 import java.io.DataOutput;
 32  
 import java.io.IOException;
 33  
 import java.util.Collection;
 34  
 import java.util.concurrent.ConcurrentMap;
 35  
 import java.util.concurrent.ConcurrentSkipListMap;
 36  
 
 37  
 /**
 38  
  * A generic container that stores vertices.  Vertex ids will map to exactly
 39  
  * one partition.
 40  
  *
 41  
  * @param <I> Vertex id
 42  
  * @param <V> Vertex data
 43  
  * @param <E> Edge data
 44  
  * @param <M> Message data
 45  
  */
 46  
 @SuppressWarnings("rawtypes")
 47  
 public class Partition<I extends WritableComparable,
 48  
     V extends Writable, E extends Writable, M extends Writable>
 49  
     implements Writable {
 50  
   /** Configuration from the worker */
 51  
   private final Configuration conf;
 52  
   /** Partition id */
 53  
   private final int id;
 54  
   /** Vertex map for this range (keyed by index) */
 55  
   private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
 56  
 
 57  
   /**
 58  
    * Constructor.
 59  
    *
 60  
    * @param conf Configuration.
 61  
    * @param id Partition id.
 62  
    */
 63  72
   public Partition(Configuration conf, int id) {
 64  72
     this.conf = conf;
 65  72
     this.id = id;
 66  72
     if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
 67  
         GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
 68  0
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
 69  
     } else {
 70  72
       vertexMap = Maps.newConcurrentMap();
 71  
     }
 72  72
   }
 73  
 
 74  
   /**
 75  
    * Get the vertex for this vertex index.
 76  
    *
 77  
    * @param vertexIndex Vertex index to search for
 78  
    * @return Vertex if it exists, null otherwise
 79  
    */
 80  
   public Vertex<I, V, E, M> getVertex(I vertexIndex) {
 81  45
     return vertexMap.get(vertexIndex);
 82  
   }
 83  
 
 84  
   /**
 85  
    * Put a vertex into the Partition
 86  
    *
 87  
    * @param vertex Vertex to put in the Partition
 88  
    * @return old vertex value (i.e. null if none existed prior)
 89  
    */
 90  
   public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
 91  480
     return vertexMap.put(vertex.getId(), vertex);
 92  
   }
 93  
 
 94  
   /**
 95  
    * Remove a vertex from the Partition
 96  
    *
 97  
    * @param vertexIndex Vertex index to remove
 98  
    * @return The removed vertex.
 99  
    */
 100  
   public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
 101  10
     return vertexMap.remove(vertexIndex);
 102  
   }
 103  
 
 104  
   /**
 105  
    * Get a collection of the vertices.
 106  
    *
 107  
    * @return Collection of the vertices
 108  
    */
 109  
   public Collection<Vertex<I, V, E , M>> getVertices() {
 110  460
     return vertexMap.values();
 111  
   }
 112  
 
 113  
   /**
 114  
    * Put several vertices in the partition.
 115  
    *
 116  
    * @param vertices Vertices to add
 117  
    */
 118  
   public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
 119  28
     for (Vertex<I, V, E , M> vertex : vertices) {
 120  437
       vertexMap.put(vertex.getId(), vertex);
 121  
     }
 122  28
   }
 123  
 
 124  
   /**
 125  
    * Get the number of edges in this partition.  Computed on the fly.
 126  
    *
 127  
    * @return Number of edges.
 128  
    */
 129  
   public long getEdgeCount() {
 130  24
     long edges = 0;
 131  24
     for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
 132  423
       edges += vertex.getNumEdges();
 133  
     }
 134  24
     return edges;
 135  
   }
 136  
 
 137  
   /**
 138  
    * Get the partition id.
 139  
    *
 140  
    * @return Id of this partition.
 141  
    */
 142  
   public int getId() {
 143  340
     return id;
 144  
   }
 145  
 
 146  
   @Override
 147  
   public String toString() {
 148  0
     return "(id=" + getId() + ",V=" + vertexMap.size() +
 149  
         ",E=" + getEdgeCount() + ")";
 150  
   }
 151  
 
 152  
   @Override
 153  
   public void readFields(DataInput input) throws IOException {
 154  0
     int vertices = input.readInt();
 155  0
     for (int i = 0; i < vertices; ++i) {
 156  0
       Vertex<I, V, E, M> vertex =
 157  
         BspUtils.<I, V, E, M>createVertex(conf);
 158  0
       vertex.readFields(input);
 159  0
       if (vertexMap.put(vertex.getId(),
 160  
           (Vertex<I, V, E, M>) vertex) != null) {
 161  0
         throw new IllegalStateException(
 162  
             "readFields: " + this +
 163  
             " already has same id " + vertex);
 164  
       }
 165  
     }
 166  0
   }
 167  
 
 168  
   @Override
 169  
   public void write(DataOutput output) throws IOException {
 170  4
     output.writeInt(vertexMap.size());
 171  4
     for (Vertex vertex : vertexMap.values()) {
 172  20
       vertex.write(output);
 173  
     }
 174  4
   }
 175  
 }