1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.graph.partition; |
20 | |
|
21 | |
import java.io.DataInput; |
22 | |
import java.io.DataOutput; |
23 | |
import java.io.IOException; |
24 | |
|
25 | |
import org.apache.giraph.graph.WorkerInfo; |
26 | |
import org.apache.hadoop.conf.Configurable; |
27 | |
import org.apache.hadoop.conf.Configuration; |
28 | |
|
29 | |
|
30 | |
|
31 | |
|
32 | |
|
33 | |
public class BasicPartitionOwner implements PartitionOwner, Configurable { |
34 | |
|
35 | |
private Configuration conf; |
36 | |
|
37 | 3566 | private int partitionId = -1; |
38 | |
|
39 | |
private WorkerInfo workerInfo; |
40 | |
|
41 | |
private WorkerInfo previousWorkerInfo; |
42 | |
|
43 | |
private String checkpointFilesPrefix; |
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | 872 | public BasicPartitionOwner() { } |
49 | |
|
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) { |
57 | 3114 | this(partitionId, workerInfo, null, null); |
58 | 3114 | } |
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | |
public BasicPartitionOwner(int partitionId, |
69 | |
WorkerInfo workerInfo, |
70 | |
WorkerInfo previousWorkerInfo, |
71 | 3130 | String checkpointFilesPrefix) { |
72 | 3130 | this.partitionId = partitionId; |
73 | 3130 | this.workerInfo = workerInfo; |
74 | 3130 | this.previousWorkerInfo = previousWorkerInfo; |
75 | 3130 | this.checkpointFilesPrefix = checkpointFilesPrefix; |
76 | 3130 | } |
77 | |
|
78 | |
@Override |
79 | |
public int getPartitionId() { |
80 | 7366 | return partitionId; |
81 | |
} |
82 | |
|
83 | |
@Override |
84 | |
public WorkerInfo getWorkerInfo() { |
85 | 3610 | return workerInfo; |
86 | |
} |
87 | |
|
88 | |
@Override |
89 | |
public void setWorkerInfo(WorkerInfo workerInfo) { |
90 | 0 | this.workerInfo = workerInfo; |
91 | 0 | } |
92 | |
|
93 | |
@Override |
94 | |
public WorkerInfo getPreviousWorkerInfo() { |
95 | 238 | return previousWorkerInfo; |
96 | |
} |
97 | |
|
98 | |
@Override |
99 | |
public void setPreviousWorkerInfo(WorkerInfo workerInfo) { |
100 | 8 | this.previousWorkerInfo = workerInfo; |
101 | 8 | } |
102 | |
|
103 | |
@Override |
104 | |
public String getCheckpointFilesPrefix() { |
105 | 0 | return checkpointFilesPrefix; |
106 | |
} |
107 | |
|
108 | |
@Override |
109 | |
public void setCheckpointFilesPrefix(String checkpointFilesPrefix) { |
110 | 0 | this.checkpointFilesPrefix = checkpointFilesPrefix; |
111 | 0 | } |
112 | |
|
113 | |
@Override |
114 | |
public void readFields(DataInput input) throws IOException { |
115 | 218 | partitionId = input.readInt(); |
116 | 218 | workerInfo = new WorkerInfo(); |
117 | 218 | workerInfo.readFields(input); |
118 | 218 | boolean hasPreviousWorkerInfo = input.readBoolean(); |
119 | 218 | if (hasPreviousWorkerInfo) { |
120 | 0 | previousWorkerInfo = new WorkerInfo(); |
121 | 0 | previousWorkerInfo.readFields(input); |
122 | |
} |
123 | 218 | boolean hasCheckpointFilePrefix = input.readBoolean(); |
124 | 218 | if (hasCheckpointFilePrefix) { |
125 | 0 | checkpointFilesPrefix = input.readUTF(); |
126 | |
} |
127 | 218 | } |
128 | |
|
129 | |
@Override |
130 | |
public void write(DataOutput output) throws IOException { |
131 | 218 | output.writeInt(partitionId); |
132 | 218 | workerInfo.write(output); |
133 | 218 | if (previousWorkerInfo != null) { |
134 | 0 | output.writeBoolean(true); |
135 | 0 | previousWorkerInfo.write(output); |
136 | |
} else { |
137 | 218 | output.writeBoolean(false); |
138 | |
} |
139 | 218 | if (checkpointFilesPrefix != null) { |
140 | 0 | output.writeBoolean(true); |
141 | 0 | output.writeUTF(checkpointFilesPrefix); |
142 | |
} else { |
143 | 218 | output.writeBoolean(false); |
144 | |
} |
145 | 218 | } |
146 | |
|
147 | |
@Override |
148 | |
public Configuration getConf() { |
149 | 0 | return conf; |
150 | |
} |
151 | |
|
152 | |
@Override |
153 | |
public void setConf(Configuration conf) { |
154 | 218 | this.conf = conf; |
155 | 218 | } |
156 | |
|
157 | |
@Override |
158 | |
public String toString() { |
159 | 32 | return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" + |
160 | |
previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")"; |
161 | |
} |
162 | |
} |