Coverage Report - org.apache.giraph.block_app.library.internal.SendMessagePiece
 
Classes in this File Line Coverage Branch Coverage Complexity
SendMessagePiece
0%
0/14
N/A
0
SendMessagePiece$1
0%
0/6
N/A
0
SendMessagePiece$1$1
0%
0/2
N/A
0
SendMessagePiece$1$2
0%
0/3
N/A
0
SendMessagePiece$1$2$1
0%
0/2
N/A
0
SendMessagePiece$2
0%
0/6
0%
0/6
0
SendMessagePiece$3
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.library.internal;
 19  
 
 20  
 import java.util.Iterator;
 21  
 
 22  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 23  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 24  
 import org.apache.giraph.block_app.framework.block.Block;
 25  
 import org.apache.giraph.block_app.framework.piece.Piece;
 26  
 import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
 27  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 28  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 29  
 import org.apache.giraph.block_app.library.striping.StripingUtils;
 30  
 import org.apache.giraph.function.Function;
 31  
 import org.apache.giraph.function.Predicate;
 32  
 import org.apache.giraph.function.primitive.Int2ObjFunction;
 33  
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 34  
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 35  
 import org.apache.giraph.graph.Vertex;
 36  
 import org.apache.hadoop.io.Writable;
 37  
 import org.apache.hadoop.io.WritableComparable;
 38  
 
 39  
 import com.google.common.base.Preconditions;
 40  
 import com.google.common.collect.Iterators;
 41  
 
 42  
 /**
 43  
  * Piece that sends a message provided through messageProducer to given set of
 44  
  * neighbors, and passes them to messagesConsumer.
 45  
  *
 46  
  * @param <I> Vertex id type
 47  
  * @param <V> Vertex value type
 48  
  * @param <E> Edge value type
 49  
  * @param <M> Message type
 50  
  */
 51  
 @SuppressWarnings("rawtypes")
 52  0
 public class SendMessagePiece<I extends WritableComparable, V extends Writable,
 53  
     E extends Writable, M extends Writable> extends Piece<I, V, E, M, Object> {
 54  
   private final String name;
 55  
   private final Class<M> messageClass;
 56  
   private final SupplierFromVertex<I, V, E, M> messageSupplier;
 57  
   private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
 58  
   private final ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer;
 59  
 
 60  
   public SendMessagePiece(String name,
 61  
       Class<M> messageClass,
 62  
       SupplierFromVertex<I, V, E, M> messageSupplier,
 63  
       SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
 64  0
       ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
 65  0
     Preconditions.checkNotNull(messageClass);
 66  0
     this.name = name;
 67  0
     this.messageClass = messageClass;
 68  0
     this.messageSupplier = messageSupplier;
 69  0
     this.targetsSupplier = targetsSupplier;
 70  0
     this.messagesConsumer = messagesConsumer;
 71  0
   }
 72  
 
 73  
   /**
 74  
    * Stripe message sending computation across multiple stripes, in
 75  
    * each stripe only part of the vertices will receive messages.
 76  
    *
 77  
    * @param stripes Number of stripes
 78  
    * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can
 79  
    *                       use StripingUtils::fastHashStripingPredicate
 80  
    * @return Resulting block
 81  
    */
 82  
   public Block stripeByReceiver(
 83  
       int stripes,
 84  
       Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
 85  0
     return StripingUtils.generateStripedBlock(
 86  
         stripes,
 87  0
         new Function<Predicate<I>, Block>() {
 88  
           @Override
 89  
           public Block apply(final Predicate<I> stripePredicate) {
 90  0
             return FilteringPiece.createReceiveFiltering(
 91  0
                 new SupplierFromVertex<I, V, E, Boolean>() {
 92  
                   @Override
 93  
                   public Boolean get(Vertex<I, V, E> vertex) {
 94  0
                     return stripePredicate.apply(vertex.getId());
 95  
                   }
 96  
                 },
 97  
                 new SendMessagePiece<>(
 98  0
                   name,
 99  0
                   messageClass,
 100  0
                   messageSupplier,
 101  0
                   new SupplierFromVertex<I, V, E, Iterator<I>>() {
 102  
                     @Override
 103  
                     public Iterator<I> get(Vertex<I, V, E> vertex) {
 104  0
                       return Iterators.filter(
 105  0
                           targetsSupplier.get(vertex),
 106  0
                           new com.google.common.base.Predicate<I>() {
 107  
                             @Override
 108  
                             public boolean apply(I targetId) {
 109  0
                               return stripePredicate.apply(targetId);
 110  
                             }
 111  
                           });
 112  
                     }
 113  
                   },
 114  0
                   messagesConsumer));
 115  
           }
 116  
         },
 117  
         stripeSupplier);
 118  
   }
 119  
 
 120  
 
 121  
   @Override
 122  
   public VertexSender<I, V, E> getVertexSender(
 123  
       final BlockWorkerSendApi<I, V, E, M> workerApi,
 124  
       Object executionStage) {
 125  0
     return new InnerVertexSender() {
 126  
       @Override
 127  
       public void vertexSend(Vertex<I, V, E> vertex) {
 128  0
         Iterator<I> targets = targetsSupplier.get(vertex);
 129  0
         M message = messageSupplier.get(vertex);
 130  0
         if (message != null && targets != null && targets.hasNext()) {
 131  0
           workerApi.sendMessageToMultipleEdges(targets, message);
 132  
         }
 133  0
       }
 134  
     };
 135  
   }
 136  
 
 137  
   @Override
 138  
   public VertexReceiver<I, V, E, M> getVertexReceiver(
 139  
       BlockWorkerReceiveApi<I> workerApi,
 140  
       Object executionStage) {
 141  0
     return new InnerVertexReceiver() {
 142  
       @Override
 143  
       public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
 144  0
         messagesConsumer.apply(vertex, messages);
 145  0
       }
 146  
     };
 147  
   }
 148  
 
 149  
   @Override
 150  
   public Class<M> getMessageClass() {
 151  0
     return messageClass;
 152  
   }
 153  
 
 154  
   @Override
 155  
   public String toString() {
 156  0
     return name;
 157  
   }
 158  
 }