Coverage Report - org.apache.giraph.comm.NettyWorkerClientServer
 
Classes in this File Line Coverage Branch Coverage Complexity
NettyWorkerClientServer
68%
22/32
N/A
1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 22  
 import org.apache.giraph.graph.Edge;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.giraph.graph.WorkerInfo;
 25  
 import org.apache.giraph.graph.partition.Partition;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 import org.apache.hadoop.mapreduce.Mapper;
 29  
 
 30  
 import java.io.IOException;
 31  
 import java.util.Collection;
 32  
 import java.util.Map;
 33  
 
 34  
 /**
 35  
  * Netty based implementation of the {@link WorkerClientServer} interface.
 36  
  *
 37  
  * @param <I> Vertex id
 38  
  * @param <V> Vertex data
 39  
  * @param <E> Edge data
 40  
  * @param <M> Message data
 41  
  */
 42  
 @SuppressWarnings("rawtypes")
 43  
 public class NettyWorkerClientServer<I extends WritableComparable,
 44  
     V extends Writable, E extends Writable, M extends Writable>
 45  
     implements WorkerClientServer<I, V, E, M> {
 46  
   /** Client that sends requests */
 47  
   private final WorkerClient<I, V, E, M> client;
 48  
   /** Server that processes requests */
 49  
   private final WorkerServer<I, V, E, M> server;
 50  
 
 51  
   /**
 52  
    * Constructor.
 53  
    *
 54  
    * @param context Mapper context
 55  
    * @param service Service for partition lookup
 56  
    */
 57  
   public NettyWorkerClientServer(Mapper<?, ?, ?, ?>.Context context,
 58  1
       CentralizedServiceWorker<I, V, E, M> service) {
 59  1
     server = new NettyWorkerServer<I, V, E, M>(context.getConfiguration(),
 60  
         service);
 61  1
     client = new NettyWorkerClient<I, V, E, M>(context, service,
 62  
        ((NettyWorkerServer<I, V, E, M>) server).getServerData());
 63  1
   }
 64  
 
 65  
   @Override
 66  
   public void fixPartitionIdToSocketAddrMap() {
 67  11
     client.fixPartitionIdToSocketAddrMap();
 68  11
   }
 69  
 
 70  
   @Override
 71  
   public void sendMessageRequest(I destVertexId, M message) {
 72  0
     client.sendMessageRequest(destVertexId, message);
 73  0
   }
 74  
 
 75  
   @Override
 76  
   public void sendPartitionRequest(WorkerInfo workerInfo,
 77  
                                    Partition<I, V, E, M> partition) {
 78  1
     client.sendPartitionRequest(workerInfo, partition);
 79  1
   }
 80  
 
 81  
   @Override
 82  
   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
 83  
       IOException {
 84  0
     client.addEdgeRequest(vertexIndex, edge);
 85  0
   }
 86  
 
 87  
   @Override
 88  
   public void removeEdgeRequest(I vertexIndex,
 89  
                                 I destinationVertexIndex) throws IOException {
 90  0
     client.removeEdgeRequest(vertexIndex, destinationVertexIndex);
 91  0
   }
 92  
 
 93  
   @Override
 94  
   public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
 95  0
     client.addVertexRequest(vertex);
 96  0
   }
 97  
 
 98  
   @Override
 99  
   public void removeVertexRequest(I vertexIndex) throws IOException {
 100  0
     client.removeVertexRequest(vertexIndex);
 101  0
   }
 102  
 
 103  
   @Override
 104  
   public void flush() throws IOException {
 105  24
     client.flush();
 106  24
   }
 107  
 
 108  
   @Override
 109  
   public long resetMessageCount() {
 110  12
     return client.resetMessageCount();
 111  
   }
 112  
 
 113  
   @Override
 114  
   public void closeConnections() throws IOException {
 115  1
     client.closeConnections();
 116  1
   }
 117  
 
 118  
   @Override
 119  
   public void setup() {
 120  1
     client.fixPartitionIdToSocketAddrMap();
 121  1
   }
 122  
 
 123  
   @Override
 124  
   public void prepareSuperstep() {
 125  11
     server.prepareSuperstep();
 126  11
   }
 127  
 
 128  
   @Override
 129  
   public Map<Integer, Collection<Vertex<I, V, E, M>>>
 130  
   getInPartitionVertexMap() {
 131  12
     return server.getInPartitionVertexMap();
 132  
   }
 133  
 
 134  
   @Override
 135  
   public ServerData<I, V, E, M> getServerData() {
 136  12
     return server.getServerData();
 137  
   }
 138  
 
 139  
   @Override
 140  
   public void close() {
 141  1
     server.close();
 142  1
   }
 143  
 
 144  
 
 145  
   @Override
 146  
   public int getPort() {
 147  1
     return server.getPort();
 148  
   }
 149  
 }