1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.graph; |
20 | |
|
21 | |
import org.apache.hadoop.io.Writable; |
22 | |
import org.apache.hadoop.io.WritableComparable; |
23 | |
import org.apache.log4j.Logger; |
24 | |
|
25 | |
import com.google.common.base.Function; |
26 | |
import com.google.common.collect.Iterables; |
27 | |
import com.google.common.collect.Lists; |
28 | |
import com.google.common.collect.Maps; |
29 | |
|
30 | |
import java.io.DataInput; |
31 | |
import java.io.DataOutput; |
32 | |
import java.io.IOException; |
33 | |
import java.util.HashMap; |
34 | |
import java.util.List; |
35 | |
import java.util.Map; |
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
@SuppressWarnings("rawtypes") |
52 | 0 | public abstract class HashMapVertex<I extends WritableComparable, |
53 | |
V extends Writable, E extends Writable, M extends Writable> |
54 | |
extends MutableVertex<I, V, E, M> { |
55 | |
|
56 | 0 | private static final Logger LOG = Logger.getLogger(HashMapVertex.class); |
57 | |
|
58 | 0 | protected Map<I, E> edgeMap = new HashMap<I, E>(); |
59 | |
|
60 | 0 | private List<M> messageList = Lists.newArrayList(); |
61 | |
|
62 | |
@Override |
63 | |
public void initialize( |
64 | |
I id, V value, Map<I, E> edges, Iterable<M> messages) { |
65 | 0 | super.initialize(id, value); |
66 | 0 | edgeMap.putAll(edges); |
67 | 0 | if (messages != null) { |
68 | 0 | Iterables.<M>addAll(messageList, messages); |
69 | |
} |
70 | 0 | } |
71 | |
|
72 | |
@Override |
73 | |
public final boolean addEdge(I targetVertexId, E value) { |
74 | 0 | if (edgeMap.put(targetVertexId, value) != null) { |
75 | 0 | if (LOG.isDebugEnabled()) { |
76 | 0 | LOG.debug("addEdge: Vertex=" + getId() + |
77 | |
": already added an edge value for target vertex id " + |
78 | |
targetVertexId); |
79 | |
} |
80 | 0 | return false; |
81 | |
} else { |
82 | 0 | return true; |
83 | |
} |
84 | |
} |
85 | |
|
86 | |
@Override |
87 | |
public boolean hasEdge(I targetVertexId) { |
88 | 0 | return edgeMap.containsKey(targetVertexId); |
89 | |
} |
90 | |
|
91 | |
|
92 | |
|
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
@Override |
98 | |
public Iterable<Edge<I, E>> getEdges() { |
99 | 0 | return Iterables.transform(edgeMap.entrySet(), |
100 | 0 | new Function<Map.Entry<I, E>, Edge<I, E>>() { |
101 | |
|
102 | |
@Override |
103 | |
public Edge<I, E> apply(Map.Entry<I, E> edge) { |
104 | 0 | return new Edge<I, E>(edge.getKey(), edge.getValue()); |
105 | |
} |
106 | |
}); |
107 | |
} |
108 | |
|
109 | |
@Override |
110 | |
public E getEdgeValue(I targetVertexId) { |
111 | 0 | return edgeMap.get(targetVertexId); |
112 | |
} |
113 | |
|
114 | |
@Override |
115 | |
public int getNumEdges() { |
116 | 0 | return edgeMap.size(); |
117 | |
} |
118 | |
|
119 | |
@Override |
120 | |
public E removeEdge(I targetVertexId) { |
121 | 0 | return edgeMap.remove(targetVertexId); |
122 | |
} |
123 | |
|
124 | |
@Override |
125 | |
public final void sendMessageToAllEdges(M message) { |
126 | 0 | for (I targetVertexId : edgeMap.keySet()) { |
127 | 0 | sendMessage(targetVertexId, message); |
128 | |
} |
129 | 0 | } |
130 | |
|
131 | |
@Override |
132 | |
void putMessages(Iterable<M> messages) { |
133 | 0 | messageList.clear(); |
134 | 0 | Iterables.addAll(messageList, messages); |
135 | 0 | } |
136 | |
|
137 | |
@Override |
138 | |
public Iterable<M> getMessages() { |
139 | 0 | return Iterables.unmodifiableIterable(messageList); |
140 | |
} |
141 | |
|
142 | |
@Override |
143 | |
public int getNumMessages() { |
144 | 0 | return messageList.size(); |
145 | |
} |
146 | |
|
147 | |
@Override |
148 | |
public final void readFields(DataInput in) throws IOException { |
149 | 0 | I vertexId = BspUtils.<I>createVertexId(getConf()); |
150 | 0 | vertexId.readFields(in); |
151 | 0 | V vertexValue = BspUtils.<V>createVertexValue(getConf()); |
152 | 0 | vertexValue.readFields(in); |
153 | 0 | super.initialize(vertexId, vertexValue); |
154 | |
|
155 | 0 | int numEdges = in.readInt(); |
156 | 0 | edgeMap = Maps.newHashMapWithExpectedSize(numEdges); |
157 | 0 | for (int i = 0; i < numEdges; ++i) { |
158 | 0 | I targetVertexId = BspUtils.<I>createVertexId(getConf()); |
159 | 0 | targetVertexId.readFields(in); |
160 | 0 | E edgeValue = BspUtils.<E>createEdgeValue(getConf()); |
161 | 0 | edgeValue.readFields(in); |
162 | 0 | edgeMap.put(targetVertexId, edgeValue); |
163 | |
} |
164 | |
|
165 | 0 | int numMessages = in.readInt(); |
166 | 0 | messageList = Lists.newArrayListWithCapacity(numMessages); |
167 | 0 | for (int i = 0; i < numMessages; ++i) { |
168 | 0 | M message = BspUtils.<M>createMessageValue(getConf()); |
169 | 0 | message.readFields(in); |
170 | 0 | messageList.add(message); |
171 | |
} |
172 | |
|
173 | 0 | boolean halt = in.readBoolean(); |
174 | 0 | if (halt) { |
175 | 0 | voteToHalt(); |
176 | |
} else { |
177 | 0 | wakeUp(); |
178 | |
} |
179 | 0 | } |
180 | |
|
181 | |
@Override |
182 | |
public final void write(DataOutput out) throws IOException { |
183 | 0 | getId().write(out); |
184 | 0 | getValue().write(out); |
185 | |
|
186 | 0 | out.writeInt(edgeMap.size()); |
187 | 0 | for (Map.Entry<I, E> edge : edgeMap.entrySet()) { |
188 | 0 | edge.getKey().write(out); |
189 | 0 | edge.getValue().write(out); |
190 | |
} |
191 | |
|
192 | 0 | out.writeInt(messageList.size()); |
193 | 0 | for (M message : messageList) { |
194 | 0 | message.write(out); |
195 | |
} |
196 | |
|
197 | 0 | out.writeBoolean(isHalted()); |
198 | 0 | } |
199 | |
|
200 | |
@Override |
201 | |
void releaseResources() { |
202 | |
|
203 | 0 | messageList.clear(); |
204 | 0 | } |
205 | |
} |
206 | |
|