1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages.primitives.long_id;
20
21 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
23
24 import java.util.List;
25
26 import org.apache.giraph.comm.messages.PartitionSplitInfo;
27 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28 import org.apache.giraph.factories.MessageValueFactory;
29 import org.apache.giraph.graph.Vertex;
30 import org.apache.giraph.partition.Partition;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Writable;
33
34
35
36
37
38
39
40
41
42
43 public abstract class LongAbstractListStore<M extends Writable,
44 L extends List> extends LongAbstractStore<M, L> {
45
46
47
48
49
50 private final
51 Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
52
53
54
55
56
57
58
59
60 public LongAbstractListStore(
61 MessageValueFactory<M> messageValueFactory,
62 PartitionSplitInfo<LongWritable> partitionInfo,
63 ImmutableClassesGiraphConfiguration<LongWritable,
64 Writable, Writable> config) {
65 super(messageValueFactory, partitionInfo, config);
66 populateMap();
67
68
69 nascentMap = new Int2ObjectOpenHashMap<>();
70 for (int partitionId : partitionInfo.getPartitionIds()) {
71 nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
72 }
73 }
74
75
76
77
78 private void populateMap() {
79
80 partitionInfo.startIteration();
81 while (true) {
82 Partition partition = partitionInfo.getNextPartition();
83 if (partition == null) {
84 break;
85 }
86 Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
87 for (Object obj : partition) {
88 Vertex vertex = (Vertex) obj;
89 LongWritable vertexId = (LongWritable) vertex.getId();
90 partitionMap.put(vertexId.get(), createList());
91 }
92 partitionInfo.putPartition(partition);
93 }
94 }
95
96
97
98
99
100 protected abstract L createList();
101
102
103
104
105
106
107
108 protected L getList(LongWritable vertexId) {
109 long id = vertexId.get();
110 int partitionId = partitionInfo.getPartitionId(vertexId);
111 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
112 L list = partitionMap.get(id);
113 if (list == null) {
114 Long2ObjectOpenHashMap<L> nascentPartitionMap =
115 nascentMap.get(partitionId);
116
117
118 synchronized (nascentPartitionMap) {
119 list = nascentPartitionMap.get(id);
120 if (list == null) {
121 list = createList();
122 nascentPartitionMap.put(id, list);
123 }
124 return list;
125 }
126 }
127 return list;
128 }
129
130 @Override
131 public void finalizeStore() {
132 for (int partitionId : nascentMap.keySet()) {
133
134 map.get(partitionId).putAll(nascentMap.get(partitionId));
135 }
136 nascentMap.clear();
137 }
138
139 @Override
140 public boolean hasMessagesForVertex(LongWritable vertexId) {
141 int partitionId = partitionInfo.getPartitionId(vertexId);
142 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
143 L list = partitionMap.get(vertexId.get());
144 if (list != null && !list.isEmpty()) {
145 return true;
146 }
147 Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
148 return nascentMessages != null &&
149 nascentMessages.containsKey(vertexId.get());
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 }