1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.utils.VertexIdData;
22 import org.apache.giraph.utils.PairList;
23 import org.apache.hadoop.io.WritableComparable;
24 import org.apache.log4j.Logger;
25
26 import java.io.DataInput;
27 import java.io.DataOutput;
28 import java.io.IOException;
29
30
31
32
33
34
35
36
37
38
39 @SuppressWarnings("unchecked")
40 public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
41 B extends VertexIdData<I, T>>
42 extends WritableRequest implements WorkerRequest {
43
44 private static final Logger LOG =
45 Logger.getLogger(SendWorkerDataRequest.class);
46
47
48
49
50
51 protected PairList<Integer, B> partitionVertexData;
52
53
54
55
56 public SendWorkerDataRequest() { }
57
58
59
60
61
62
63 public SendWorkerDataRequest(
64 PairList<Integer, B> partVertData) {
65 this.partitionVertexData = partVertData;
66 }
67
68
69
70
71
72
73
74
75 public abstract B createVertexIdData();
76
77 @Override
78 public void readFieldsRequest(DataInput input) throws IOException {
79 int numPartitions = input.readInt();
80 partitionVertexData = new PairList<Integer, B>();
81 partitionVertexData.initialize(numPartitions);
82 while (numPartitions-- > 0) {
83 final int partitionId = input.readInt();
84 B vertexIdData = createVertexIdData();
85 vertexIdData.setConf(getConf());
86 vertexIdData.readFields(input);
87 partitionVertexData.add(partitionId, vertexIdData);
88 }
89 }
90
91 @Override
92 public void writeRequest(DataOutput output) throws IOException {
93 output.writeInt(partitionVertexData.getSize());
94 PairList<Integer, B>.Iterator
95 iterator = partitionVertexData.getIterator();
96 while (iterator.hasNext()) {
97 iterator.next();
98 output.writeInt(iterator.getCurrentFirst());
99 iterator.getCurrentSecond().write(output);
100 }
101 }
102
103 @Override
104 public int getSerializedSize() {
105 int size = super.getSerializedSize() + 4;
106 PairList<Integer, B>.Iterator iterator = partitionVertexData.getIterator();
107 while (iterator.hasNext()) {
108 iterator.next();
109 size += 4 + iterator.getCurrentSecond().getSerializedSize();
110 }
111 return size;
112 }
113 }
114