1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.graph.partition; |
20 | |
|
21 | |
import java.util.ArrayList; |
22 | |
import java.util.Collection; |
23 | |
import java.util.Iterator; |
24 | |
import java.util.List; |
25 | |
|
26 | |
import org.apache.giraph.graph.WorkerInfo; |
27 | |
import org.apache.hadoop.conf.Configuration; |
28 | |
import org.apache.hadoop.io.Writable; |
29 | |
import org.apache.hadoop.io.WritableComparable; |
30 | |
import org.apache.log4j.Logger; |
31 | |
|
32 | |
|
33 | |
|
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
@SuppressWarnings("rawtypes") |
41 | |
public class HashMasterPartitioner<I extends WritableComparable, |
42 | |
V extends Writable, E extends Writable, M extends Writable> implements |
43 | |
MasterGraphPartitioner<I, V, E, M> { |
44 | |
|
45 | |
public static final String PARTITION_COUNT_MULTIPLIER = |
46 | |
"hash.masterPartitionCountMultipler"; |
47 | |
|
48 | |
public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f; |
49 | |
|
50 | |
public static final String USER_PARTITION_COUNT = |
51 | |
"hash.userPartitionCount"; |
52 | |
|
53 | |
public static final int DEFAULT_USER_PARTITION_COUNT = -1; |
54 | |
|
55 | 1 | private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); |
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
private static final int MAX_PARTTIONS = 1024 * 1024 / 350; |
61 | |
|
62 | |
private Configuration conf; |
63 | |
|
64 | |
private final int userPartitionCount; |
65 | |
|
66 | 24 | private int partitionCount = -1; |
67 | |
|
68 | |
private List<PartitionOwner> partitionOwnerList; |
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | 24 | public HashMasterPartitioner(Configuration conf) { |
76 | 24 | this.conf = conf; |
77 | 24 | userPartitionCount = conf.getInt(USER_PARTITION_COUNT, |
78 | |
DEFAULT_USER_PARTITION_COUNT); |
79 | 24 | } |
80 | |
|
81 | |
@Override |
82 | |
public Collection<PartitionOwner> createInitialPartitionOwners( |
83 | |
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { |
84 | 24 | if (availableWorkerInfos.isEmpty()) { |
85 | 0 | throw new IllegalArgumentException( |
86 | |
"createInitialPartitionOwners: No available workers"); |
87 | |
} |
88 | 24 | List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>(); |
89 | 24 | Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator(); |
90 | 24 | if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) { |
91 | 24 | float multiplier = conf.getFloat( |
92 | |
PARTITION_COUNT_MULTIPLIER, |
93 | |
DEFAULT_PARTITION_COUNT_MULTIPLIER); |
94 | 24 | partitionCount = |
95 | |
Math.max((int) (multiplier * availableWorkerInfos.size() * |
96 | |
availableWorkerInfos.size()), |
97 | |
1); |
98 | 24 | } else { |
99 | 0 | partitionCount = userPartitionCount; |
100 | |
} |
101 | 24 | if (LOG.isInfoEnabled()) { |
102 | 24 | LOG.info("createInitialPartitionOwners: Creating " + |
103 | |
partitionCount + ", default would have been " + |
104 | |
(availableWorkerInfos.size() * |
105 | |
availableWorkerInfos.size()) + " partitions."); |
106 | |
} |
107 | 24 | if (partitionCount > MAX_PARTTIONS) { |
108 | 0 | LOG.warn("createInitialPartitionOwners: " + |
109 | |
"Reducing the partitionCount to " + MAX_PARTTIONS + |
110 | |
" from " + partitionCount); |
111 | 0 | partitionCount = MAX_PARTTIONS; |
112 | |
} |
113 | |
|
114 | 48 | for (int i = 0; i < partitionCount; ++i) { |
115 | 24 | PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next()); |
116 | 24 | if (!workerIt.hasNext()) { |
117 | 24 | workerIt = availableWorkerInfos.iterator(); |
118 | |
} |
119 | 24 | ownerList.add(owner); |
120 | |
} |
121 | 24 | this.partitionOwnerList = ownerList; |
122 | 24 | return ownerList; |
123 | |
} |
124 | |
|
125 | |
@Override |
126 | |
public Collection<PartitionOwner> getCurrentPartitionOwners() { |
127 | 36 | return partitionOwnerList; |
128 | |
} |
129 | |
|
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
protected void setPartitionOwnerList(List<PartitionOwner> |
136 | |
partitionOwnerList) { |
137 | 16 | this.partitionOwnerList = partitionOwnerList; |
138 | 16 | } |
139 | |
|
140 | |
@Override |
141 | |
public Collection<PartitionOwner> generateChangedPartitionOwners( |
142 | |
Collection<PartitionStats> allPartitionStatsList, |
143 | |
Collection<WorkerInfo> availableWorkerInfos, |
144 | |
int maxWorkers, |
145 | |
long superstep) { |
146 | 178 | return PartitionBalancer.balancePartitionsAcrossWorkers( |
147 | |
conf, |
148 | |
partitionOwnerList, |
149 | |
allPartitionStatsList, |
150 | |
availableWorkerInfos); |
151 | |
} |
152 | |
|
153 | |
@Override |
154 | |
public PartitionStats createPartitionStats() { |
155 | 218 | return new PartitionStats(); |
156 | |
} |
157 | |
} |