Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
StripingUtils |
|
| 1.1875;1.188 | ||||
StripingUtils$1 |
|
| 1.1875;1.188 | ||||
StripingUtils$2 |
|
| 1.1875;1.188 | ||||
StripingUtils$2$1 |
|
| 1.1875;1.188 | ||||
StripingUtils$3 |
|
| 1.1875;1.188 | ||||
StripingUtils$3$1 |
|
| 1.1875;1.188 | ||||
StripingUtils$4 |
|
| 1.1875;1.188 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.block_app.library.striping; | |
19 | ||
20 | import org.apache.giraph.block_app.framework.block.Block; | |
21 | import org.apache.giraph.block_app.framework.block.FilteringBlock; | |
22 | import org.apache.giraph.block_app.framework.block.SequenceBlock; | |
23 | import org.apache.giraph.function.Function; | |
24 | import org.apache.giraph.function.Predicate; | |
25 | import org.apache.giraph.function.primitive.Int2ObjFunction; | |
26 | import org.apache.giraph.function.primitive.Obj2IntFunction; | |
27 | import org.apache.giraph.function.vertex.SupplierFromVertex; | |
28 | import org.apache.giraph.graph.Vertex; | |
29 | import org.apache.hadoop.io.LongWritable; | |
30 | import org.apache.hadoop.io.Writable; | |
31 | import org.apache.hadoop.io.WritableComparable; | |
32 | ||
33 | import com.google.common.base.Preconditions; | |
34 | ||
35 | /** | |
36 | * Utility functions for doing superstep striping. | |
37 | * | |
38 | * We need to make sure that partitioning (which uses mod for distributing | |
39 | * data across workers) is independent from striping itself. So we are using | |
40 | * fastHash function below, taken from https://code.google.com/p/fast-hash/. | |
41 | */ | |
42 | public class StripingUtils { | |
43 | 0 | private StripingUtils() { } |
44 | ||
45 | /* The MIT License | |
46 | ||
47 | Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com) | |
48 | ||
49 | Permission is hereby granted, free of charge, to any person | |
50 | obtaining a copy of this software and associated documentation | |
51 | files (the "Software"), to deal in the Software without | |
52 | restriction, including without limitation the rights to use, copy, | |
53 | modify, merge, publish, distribute, sublicense, and/or sell copies | |
54 | of the Software, and to permit persons to whom the Software is | |
55 | furnished to do so, subject to the following conditions: | |
56 | ||
57 | The above copyright notice and this permission notice shall be | |
58 | included in all copies or substantial portions of the Software. | |
59 | ||
60 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
61 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
62 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
63 | NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS | |
64 | BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN | |
65 | ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
66 | CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
67 | SOFTWARE. | |
68 | */ | |
69 | /** | |
70 | * Returns 32-bit hash of a given value. | |
71 | * | |
72 | * Fast and generally good hashing function, adapted from C++ implementation: | |
73 | * https://code.google.com/p/fast-hash/ | |
74 | */ | |
75 | public static int fastHash(long h) { | |
76 | 0 | h ^= h >> 23; |
77 | 0 | h *= 0x2127599bf4325c37L; |
78 | 0 | h ^= h >> 47; |
79 | 0 | return ((int) (h - (h >> 32))) & 0x7fffffff; |
80 | } | |
81 | ||
82 | /** | |
83 | * Returns number in [0, stripes) range, from given input {@code value}. | |
84 | */ | |
85 | public static int fastStripe(long value, int stripes) { | |
86 | 0 | return fastHash(value) % stripes; |
87 | } | |
88 | ||
89 | /** | |
90 | * Fast hash-based striping for LongWritable IDs, returns a function | |
91 | * that for a given ID returns it's stripe index. | |
92 | */ | |
93 | public static | |
94 | Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) { | |
95 | 0 | return new Obj2IntFunction<LongWritable>() { |
96 | @Override | |
97 | public int apply(LongWritable id) { | |
98 | 0 | return fastStripe(id.get(), stripes); |
99 | } | |
100 | }; | |
101 | } | |
102 | ||
103 | /** | |
104 | * Fast hash-based striping for LongWritable IDs, returns a function | |
105 | * that for a given stripe index returns a predicate checking whether ID is | |
106 | * in that stripe. | |
107 | */ | |
108 | public static | |
109 | Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate( | |
110 | final int stripes) { | |
111 | 0 | return new Int2ObjFunction<Predicate<LongWritable>>() { |
112 | @Override | |
113 | public Predicate<LongWritable> apply(final int stripe) { | |
114 | 0 | return new Predicate<LongWritable>() { |
115 | @Override | |
116 | public boolean apply(LongWritable id) { | |
117 | 0 | return fastStripe(id.get(), stripes) == stripe; |
118 | } | |
119 | }; | |
120 | } | |
121 | }; | |
122 | } | |
123 | ||
124 | /** | |
125 | * Generate striped block, with given number of {@code stripes}, | |
126 | * using given {@code blockGenerator} to generate block for each stripe. | |
127 | * | |
128 | * @param stripes Number of stripes | |
129 | * @param blockGenerator Function given predicate representing whether | |
130 | * ID is in current stripe, should return Block | |
131 | * for current stripe | |
132 | * @return Resulting block | |
133 | */ | |
134 | public static Block generateStripedBlock( | |
135 | int stripes, | |
136 | Function<Predicate<LongWritable>, Block> blockGenerator) { | |
137 | 0 | return generateStripedBlockImpl( |
138 | stripes, blockGenerator, | |
139 | 0 | StripingUtils.fastHashStripingPredicate(stripes)); |
140 | } | |
141 | ||
142 | /** | |
143 | * Generate striped block, with given number of {@code stripes}, | |
144 | * using given {@code blockGenerator} to generate block for each stripe, | |
145 | * and using striping based on given {@code stripeSupplier}. | |
146 | * | |
147 | * @param stripes Number of stripes | |
148 | * @param blockGenerator Function given predicate representing whether | |
149 | * ID is in current stripe, should return Block | |
150 | * for current stripe | |
151 | * @param stripeSupplier Function given number of stripes, | |
152 | * generates a function that given stripe index, | |
153 | * returns predicate checking whether ID is in that | |
154 | * stripe. | |
155 | * @return Resulting block | |
156 | */ | |
157 | public static <I extends WritableComparable> | |
158 | Block generateStripedBlock( | |
159 | int stripes, | |
160 | Function<Predicate<I>, Block> blockGenerator, | |
161 | Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) { | |
162 | 0 | return generateStripedBlockImpl( |
163 | 0 | stripes, blockGenerator, stripeSupplier.apply(stripes)); |
164 | } | |
165 | ||
166 | /** | |
167 | * Stripe given block, by calling vertexSend only in it's corresponding | |
168 | * stripe. All other methods are called number of stripes times. | |
169 | * | |
170 | * @param stripes Number of stripes | |
171 | * @param block Block to stripe | |
172 | * @return Resulting block | |
173 | */ | |
174 | public static Block stripeBlockBySenders( | |
175 | int stripes, | |
176 | Block block) { | |
177 | 0 | return generateStripedBlockImpl( |
178 | stripes, | |
179 | 0 | StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block), |
180 | 0 | StripingUtils.fastHashStripingPredicate(stripes)); |
181 | } | |
182 | ||
183 | /** | |
184 | * Given a block, creates a function that will given a predicate filter | |
185 | * calls to vertexSend function based on that predicate. | |
186 | * | |
187 | * Useful to be combined with generateStripedBlock to stripe blocks. | |
188 | */ | |
189 | public static <I extends WritableComparable> Function<Predicate<I>, Block> | |
190 | createSingleStripeBySendersFunction(final Block block) { | |
191 | 0 | return new Function<Predicate<I>, Block>() { |
192 | @Override | |
193 | public Block apply(final Predicate<I> stripePredicate) { | |
194 | 0 | return FilteringBlock.createSendFiltering( |
195 | 0 | new SupplierFromVertex<I, Writable, Writable, Boolean>() { |
196 | @Override | |
197 | public Boolean get(Vertex<I, Writable, Writable> vertex) { | |
198 | 0 | return stripePredicate.apply(vertex.getId()); |
199 | } | |
200 | }, block); | |
201 | } | |
202 | }; | |
203 | } | |
204 | ||
205 | private static <I extends WritableComparable> | |
206 | Block generateStripedBlockImpl( | |
207 | int stripes, | |
208 | Function<Predicate<I>, Block> blockGenerator, | |
209 | Int2ObjFunction<Predicate<I>> stripeSupplier) { | |
210 | 0 | Preconditions.checkArgument(stripes >= 1); |
211 | 0 | if (stripes == 1) { |
212 | 0 | return blockGenerator.apply(new Predicate<I>() { |
213 | @Override | |
214 | public boolean apply(I input) { | |
215 | 0 | return true; |
216 | } | |
217 | }); | |
218 | } | |
219 | 0 | Block[] blocks = new Block[stripes]; |
220 | 0 | for (int i = 0; i < stripes; i++) { |
221 | 0 | blocks[i] = blockGenerator.apply(stripeSupplier.apply(i)); |
222 | } | |
223 | 0 | return new SequenceBlock(blocks); |
224 | } | |
225 | } |