Coverage Report - org.apache.giraph.block_app.library.striping.StripingUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
StripingUtils
0%
0/23
0%
0/6
1.188
StripingUtils$1
0%
0/2
N/A
1.188
StripingUtils$2
0%
0/2
N/A
1.188
StripingUtils$2$1
0%
0/2
0%
0/2
1.188
StripingUtils$3
0%
0/2
N/A
1.188
StripingUtils$3$1
0%
0/2
N/A
1.188
StripingUtils$4
0%
0/2
N/A
1.188
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.library.striping;
 19  
 
 20  
 import org.apache.giraph.block_app.framework.block.Block;
 21  
 import org.apache.giraph.block_app.framework.block.FilteringBlock;
 22  
 import org.apache.giraph.block_app.framework.block.SequenceBlock;
 23  
 import org.apache.giraph.function.Function;
 24  
 import org.apache.giraph.function.Predicate;
 25  
 import org.apache.giraph.function.primitive.Int2ObjFunction;
 26  
 import org.apache.giraph.function.primitive.Obj2IntFunction;
 27  
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 28  
 import org.apache.giraph.graph.Vertex;
 29  
 import org.apache.hadoop.io.LongWritable;
 30  
 import org.apache.hadoop.io.Writable;
 31  
 import org.apache.hadoop.io.WritableComparable;
 32  
 
 33  
 import com.google.common.base.Preconditions;
 34  
 
 35  
 /**
 36  
  * Utility functions for doing superstep striping.
 37  
  *
 38  
  * We need to make sure that partitioning (which uses mod for distributing
 39  
  * data across workers) is independent from striping itself. So we are using
 40  
  * fastHash function below, taken from https://code.google.com/p/fast-hash/.
 41  
  */
 42  
 public class StripingUtils {
 43  0
   private StripingUtils() { }
 44  
 
 45  
   /* The MIT License
 46  
 
 47  
   Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com)
 48  
 
 49  
   Permission is hereby granted, free of charge, to any person
 50  
   obtaining a copy of this software and associated documentation
 51  
   files (the "Software"), to deal in the Software without
 52  
   restriction, including without limitation the rights to use, copy,
 53  
   modify, merge, publish, distribute, sublicense, and/or sell copies
 54  
   of the Software, and to permit persons to whom the Software is
 55  
   furnished to do so, subject to the following conditions:
 56  
 
 57  
   The above copyright notice and this permission notice shall be
 58  
   included in all copies or substantial portions of the Software.
 59  
 
 60  
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 61  
   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 62  
   MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 63  
   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 64  
   BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 65  
   ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 66  
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 67  
   SOFTWARE.
 68  
   */
 69  
   /**
 70  
    * Returns 32-bit hash of a given value.
 71  
    *
 72  
    * Fast and generally good hashing function, adapted from C++ implementation:
 73  
    * https://code.google.com/p/fast-hash/
 74  
    */
 75  
   public static int fastHash(long h) {
 76  0
     h ^= h >> 23;
 77  0
     h *= 0x2127599bf4325c37L;
 78  0
     h ^= h >> 47;
 79  0
     return ((int) (h - (h >> 32))) & 0x7fffffff;
 80  
   }
 81  
 
 82  
   /**
 83  
    * Returns number in [0, stripes) range, from given input {@code value}.
 84  
    */
 85  
   public static int fastStripe(long value, int stripes) {
 86  0
     return fastHash(value) % stripes;
 87  
   }
 88  
 
 89  
   /**
 90  
    * Fast hash-based striping for LongWritable IDs, returns a function
 91  
    * that for a given ID returns it's stripe index.
 92  
    */
 93  
   public static
 94  
   Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) {
 95  0
     return new Obj2IntFunction<LongWritable>() {
 96  
       @Override
 97  
       public int apply(LongWritable id) {
 98  0
         return fastStripe(id.get(), stripes);
 99  
       }
 100  
     };
 101  
   }
 102  
 
 103  
   /**
 104  
    * Fast hash-based striping for LongWritable IDs, returns a function
 105  
    * that for a given stripe index returns a predicate checking whether ID is
 106  
    * in that stripe.
 107  
    */
 108  
   public static
 109  
   Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
 110  
       final int stripes) {
 111  0
     return new Int2ObjFunction<Predicate<LongWritable>>() {
 112  
       @Override
 113  
       public Predicate<LongWritable> apply(final int stripe) {
 114  0
         return new Predicate<LongWritable>() {
 115  
           @Override
 116  
           public boolean apply(LongWritable id) {
 117  0
             return fastStripe(id.get(), stripes) == stripe;
 118  
           }
 119  
         };
 120  
       }
 121  
     };
 122  
   }
 123  
 
 124  
   /**
 125  
    * Generate striped block, with given number of {@code stripes},
 126  
    * using given {@code blockGenerator} to generate block for each stripe.
 127  
    *
 128  
    * @param stripes Number of stripes
 129  
    * @param blockGenerator Function given predicate representing whether
 130  
    *                       ID is in current stripe, should return Block
 131  
    *                       for current stripe
 132  
    * @return Resulting block
 133  
    */
 134  
   public static Block generateStripedBlock(
 135  
       int stripes,
 136  
       Function<Predicate<LongWritable>, Block> blockGenerator) {
 137  0
     return generateStripedBlockImpl(
 138  
         stripes, blockGenerator,
 139  0
         StripingUtils.fastHashStripingPredicate(stripes));
 140  
   }
 141  
 
 142  
   /**
 143  
    * Generate striped block, with given number of {@code stripes},
 144  
    * using given {@code blockGenerator} to generate block for each stripe,
 145  
    * and using striping based on given {@code stripeSupplier}.
 146  
    *
 147  
    * @param stripes Number of stripes
 148  
    * @param blockGenerator Function given predicate representing whether
 149  
    *                       ID is in current stripe, should return Block
 150  
    *                       for current stripe
 151  
    * @param stripeSupplier Function given number of stripes,
 152  
    *                       generates a function that given stripe index,
 153  
    *                       returns predicate checking whether ID is in that
 154  
    *                       stripe.
 155  
    * @return Resulting block
 156  
    */
 157  
   public static <I extends WritableComparable>
 158  
   Block generateStripedBlock(
 159  
       int stripes,
 160  
       Function<Predicate<I>, Block> blockGenerator,
 161  
       Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
 162  0
     return generateStripedBlockImpl(
 163  0
         stripes, blockGenerator, stripeSupplier.apply(stripes));
 164  
   }
 165  
 
 166  
   /**
 167  
    * Stripe given block, by calling vertexSend only in it's corresponding
 168  
    * stripe. All other methods are called number of stripes times.
 169  
    *
 170  
    * @param stripes Number of stripes
 171  
    * @param block Block to stripe
 172  
    * @return Resulting block
 173  
    */
 174  
   public static Block stripeBlockBySenders(
 175  
       int stripes,
 176  
       Block block) {
 177  0
     return generateStripedBlockImpl(
 178  
         stripes,
 179  0
         StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
 180  0
         StripingUtils.fastHashStripingPredicate(stripes));
 181  
   }
 182  
 
 183  
   /**
 184  
    * Given a block, creates a function that will given a predicate filter
 185  
    * calls to vertexSend function based on that predicate.
 186  
    *
 187  
    * Useful to be combined with generateStripedBlock to stripe blocks.
 188  
    */
 189  
   public static <I extends WritableComparable> Function<Predicate<I>, Block>
 190  
       createSingleStripeBySendersFunction(final Block block) {
 191  0
     return new Function<Predicate<I>, Block>() {
 192  
       @Override
 193  
       public Block apply(final Predicate<I> stripePredicate) {
 194  0
         return FilteringBlock.createSendFiltering(
 195  0
             new SupplierFromVertex<I, Writable, Writable, Boolean>() {
 196  
               @Override
 197  
               public Boolean get(Vertex<I, Writable, Writable> vertex) {
 198  0
                 return stripePredicate.apply(vertex.getId());
 199  
               }
 200  
             }, block);
 201  
       }
 202  
     };
 203  
   }
 204  
 
 205  
   private static <I extends WritableComparable>
 206  
   Block generateStripedBlockImpl(
 207  
       int stripes,
 208  
       Function<Predicate<I>, Block> blockGenerator,
 209  
       Int2ObjFunction<Predicate<I>> stripeSupplier) {
 210  0
     Preconditions.checkArgument(stripes >= 1);
 211  0
     if (stripes == 1) {
 212  0
       return blockGenerator.apply(new Predicate<I>() {
 213  
         @Override
 214  
         public boolean apply(I input) {
 215  0
           return true;
 216  
         }
 217  
       });
 218  
     }
 219  0
     Block[] blocks = new Block[stripes];
 220  0
     for (int i = 0; i < stripes; i++) {
 221  0
       blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
 222  
     }
 223  0
     return new SequenceBlock(blocks);
 224  
   }
 225  
 }