1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.graph; |
19 | |
|
20 | |
import org.apache.hadoop.io.DoubleWritable; |
21 | |
import org.apache.hadoop.io.FloatWritable; |
22 | |
import org.apache.hadoop.io.LongWritable; |
23 | |
import org.apache.log4j.Logger; |
24 | |
import org.apache.mahout.math.function.DoubleProcedure; |
25 | |
import org.apache.mahout.math.function.LongFloatProcedure; |
26 | |
import org.apache.mahout.math.list.DoubleArrayList; |
27 | |
import org.apache.mahout.math.map.OpenLongFloatHashMap; |
28 | |
|
29 | |
import com.google.common.collect.UnmodifiableIterator; |
30 | |
|
31 | |
import java.io.DataInput; |
32 | |
import java.io.DataOutput; |
33 | |
import java.io.IOException; |
34 | |
import java.util.Iterator; |
35 | |
import java.util.Map; |
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | 30 | public abstract class LongDoubleFloatDoubleVertex extends |
42 | |
MutableVertex<LongWritable, DoubleWritable, FloatWritable, |
43 | |
DoubleWritable> { |
44 | |
|
45 | 1 | private static final Logger LOG = |
46 | |
Logger.getLogger(LongDoubleFloatDoubleVertex.class); |
47 | |
|
48 | 15 | private OpenLongFloatHashMap edgeMap = |
49 | |
new OpenLongFloatHashMap(); |
50 | |
|
51 | 15 | private DoubleArrayList messageList = new DoubleArrayList(); |
52 | |
|
53 | |
@Override |
54 | |
public void initialize(LongWritable id, DoubleWritable value, |
55 | |
Map<LongWritable, FloatWritable> edges, |
56 | |
Iterable<DoubleWritable> messages) { |
57 | 15 | super.initialize(id, value); |
58 | 15 | if (edges != null) { |
59 | |
for (Map.Entry<LongWritable, FloatWritable> edge : |
60 | 15 | edges.entrySet()) { |
61 | 15 | edgeMap.put(edge.getKey().get(), |
62 | |
edge.getValue().get()); |
63 | |
} |
64 | |
} |
65 | 15 | if (messages != null) { |
66 | 0 | for (DoubleWritable m : messages) { |
67 | 0 | messageList.add(m.get()); |
68 | |
} |
69 | |
} |
70 | 15 | } |
71 | |
|
72 | |
@Override |
73 | |
public final boolean addEdge(LongWritable targetId, |
74 | |
FloatWritable edgeValue) { |
75 | 0 | if (edgeMap.put(targetId.get(), edgeValue.get())) { |
76 | 0 | if (LOG.isDebugEnabled()) { |
77 | 0 | LOG.debug("addEdge: Vertex=" + getId() + |
78 | |
": already added an edge value for dest vertex id " + |
79 | |
targetId.get()); |
80 | |
} |
81 | 0 | return false; |
82 | |
} else { |
83 | 0 | return true; |
84 | |
} |
85 | |
} |
86 | |
|
87 | |
@Override |
88 | |
public FloatWritable removeEdge(LongWritable targetVertexId) { |
89 | 0 | long target = targetVertexId.get(); |
90 | 0 | if (edgeMap.containsKey(target)) { |
91 | 0 | float value = edgeMap.get(target); |
92 | 0 | edgeMap.removeKey(target); |
93 | 0 | return new FloatWritable(value); |
94 | |
} else { |
95 | 0 | return null; |
96 | |
} |
97 | |
} |
98 | |
|
99 | |
@Override |
100 | |
public Iterable<Edge<LongWritable, FloatWritable>> getEdges() { |
101 | 300 | final long[] targetVertices = edgeMap.keys().elements(); |
102 | 300 | final int numEdges = edgeMap.size(); |
103 | |
|
104 | 300 | return new Iterable<Edge<LongWritable, FloatWritable>>() { |
105 | |
@Override |
106 | |
public Iterator<Edge<LongWritable, FloatWritable>> iterator() { |
107 | 600 | return new Iterator<Edge<LongWritable, FloatWritable>>() { |
108 | 300 | private int offset = 0; |
109 | |
|
110 | |
@Override |
111 | |
public boolean hasNext() { |
112 | 600 | return offset < numEdges; |
113 | |
} |
114 | |
|
115 | |
@Override |
116 | |
public Edge<LongWritable, FloatWritable> next() { |
117 | 300 | long targetVertex = targetVertices[offset++]; |
118 | 300 | return new Edge<LongWritable, FloatWritable>( |
119 | |
new LongWritable(targetVertex), |
120 | |
new FloatWritable(targetVertex)); |
121 | |
} |
122 | |
|
123 | |
@Override |
124 | |
public void remove() { |
125 | 0 | throw new UnsupportedOperationException( |
126 | |
"Mutation disallowed for edge list via iterator"); |
127 | |
} |
128 | |
}; |
129 | |
} |
130 | |
}; |
131 | |
} |
132 | |
|
133 | |
@Override |
134 | |
public boolean hasEdge(LongWritable targetVertexId) { |
135 | 0 | return edgeMap.containsKey(targetVertexId.get()); |
136 | |
} |
137 | |
|
138 | |
@Override |
139 | |
public int getNumEdges() { |
140 | 720 | return edgeMap.size(); |
141 | |
} |
142 | |
|
143 | |
@Override |
144 | |
public final void readFields(DataInput in) throws IOException { |
145 | 0 | long id = in.readLong(); |
146 | 0 | double value = in.readDouble(); |
147 | 0 | super.initialize(new LongWritable(id), new DoubleWritable(value)); |
148 | 0 | long edgeMapSize = in.readLong(); |
149 | 0 | for (long i = 0; i < edgeMapSize; ++i) { |
150 | 0 | long targetVertexId = in.readLong(); |
151 | 0 | float edgeValue = in.readFloat(); |
152 | 0 | edgeMap.put(targetVertexId, edgeValue); |
153 | |
} |
154 | 0 | long messageListSize = in.readLong(); |
155 | 0 | for (long i = 0; i < messageListSize; ++i) { |
156 | 0 | messageList.add(in.readDouble()); |
157 | |
} |
158 | 0 | boolean halt = in.readBoolean(); |
159 | 0 | if (halt) { |
160 | 0 | voteToHalt(); |
161 | |
} else { |
162 | 0 | wakeUp(); |
163 | |
} |
164 | 0 | } |
165 | |
|
166 | |
@Override |
167 | |
public final void write(final DataOutput out) throws IOException { |
168 | 0 | out.writeLong(getId().get()); |
169 | 0 | out.writeDouble(getValue().get()); |
170 | 0 | out.writeLong(edgeMap.size()); |
171 | 0 | edgeMap.forEachPair(new LongFloatProcedure() { |
172 | |
@Override |
173 | |
public boolean apply(long destVertexId, float edgeValue) { |
174 | |
try { |
175 | 0 | out.writeLong(destVertexId); |
176 | 0 | out.writeFloat(edgeValue); |
177 | 0 | } catch (IOException e) { |
178 | 0 | throw new IllegalStateException( |
179 | |
"apply: IOException when not allowed", e); |
180 | 0 | } |
181 | 0 | return true; |
182 | |
} |
183 | |
}); |
184 | 0 | out.writeLong(messageList.size()); |
185 | 0 | messageList.forEach(new DoubleProcedure() { |
186 | |
@Override |
187 | |
public boolean apply(double message) { |
188 | |
try { |
189 | 0 | out.writeDouble(message); |
190 | 0 | } catch (IOException e) { |
191 | 0 | throw new IllegalStateException( |
192 | |
"apply: IOException when not allowed", e); |
193 | 0 | } |
194 | 0 | return true; |
195 | |
} |
196 | |
}); |
197 | 0 | out.writeBoolean(isHalted()); |
198 | 0 | } |
199 | |
|
200 | |
@Override |
201 | |
void putMessages(Iterable<DoubleWritable> messages) { |
202 | 300 | messageList.clear(); |
203 | 300 | for (DoubleWritable message : messages) { |
204 | 300 | messageList.add(message.get()); |
205 | |
} |
206 | 300 | } |
207 | |
|
208 | |
@Override |
209 | |
void releaseResources() { |
210 | |
|
211 | 360 | messageList.clear(); |
212 | 360 | } |
213 | |
|
214 | |
@Override |
215 | |
public int getNumMessages() { |
216 | 0 | return messageList.size(); |
217 | |
} |
218 | |
|
219 | |
@Override |
220 | |
public Iterable<DoubleWritable> getMessages() { |
221 | 720 | return new UnmodifiableDoubleWritableIterable(messageList); |
222 | |
} |
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
private static class UnmodifiableDoubleWritableIterable |
228 | |
implements Iterable<DoubleWritable> { |
229 | |
|
230 | |
private final DoubleArrayList elementList; |
231 | |
|
232 | |
|
233 | |
|
234 | |
|
235 | |
|
236 | |
|
237 | |
public UnmodifiableDoubleWritableIterable( |
238 | 720 | DoubleArrayList elementList) { |
239 | 720 | this.elementList = elementList; |
240 | 720 | } |
241 | |
|
242 | |
@Override |
243 | |
public Iterator<DoubleWritable> iterator() { |
244 | 660 | return new UnmodifiableDoubleWritableIterator( |
245 | |
elementList); |
246 | |
} |
247 | |
} |
248 | |
|
249 | |
|
250 | |
|
251 | |
|
252 | 15 | private static class UnmodifiableDoubleWritableIterator |
253 | |
extends UnmodifiableIterator<DoubleWritable> { |
254 | |
|
255 | |
private final DoubleArrayList elementList; |
256 | |
|
257 | 660 | private int offset = 0; |
258 | |
|
259 | |
|
260 | |
|
261 | |
|
262 | |
|
263 | |
|
264 | 660 | UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) { |
265 | 660 | this.elementList = elementList; |
266 | 660 | } |
267 | |
|
268 | |
@Override |
269 | |
public boolean hasNext() { |
270 | 960 | return offset < elementList.size(); |
271 | |
} |
272 | |
|
273 | |
@Override |
274 | |
public DoubleWritable next() { |
275 | 300 | return new DoubleWritable(elementList.get(offset++)); |
276 | |
} |
277 | |
} |
278 | |
} |