Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
SccPhaseMasterCompute |
|
| 2.6;2.6 | ||||
SccPhaseMasterCompute$1 |
|
| 2.6;2.6 | ||||
SccPhaseMasterCompute$Phases |
|
| 2.6;2.6 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.examples.scc; | |
19 | ||
20 | import org.apache.giraph.aggregators.BooleanOrAggregator; | |
21 | import org.apache.giraph.aggregators.IntOverwriteAggregator; | |
22 | import org.apache.giraph.master.DefaultMasterCompute; | |
23 | import org.apache.hadoop.io.BooleanWritable; | |
24 | import org.apache.hadoop.io.IntWritable; | |
25 | ||
26 | /** | |
27 | * This master compute keeps track of what phase is being currently executed by | |
28 | * the Strongly Connected Components computation. The phases comprehend the | |
29 | * following: 1 - Transpose (comprehends 2 supersteps, one to propagate parent | |
30 | * vertices ids and another one to store them by their respective children) 2 - | |
31 | * Trimming (this phase can happen multiple times) 3 - Forward Traversal 4 - | |
32 | * Backward Traversal | |
33 | */ | |
34 | 0 | public class SccPhaseMasterCompute extends DefaultMasterCompute { |
35 | ||
36 | /** | |
37 | * Aggregator that stores the current phase | |
38 | */ | |
39 | public static final String PHASE = "scccompute.phase"; | |
40 | ||
41 | /** | |
42 | * Flags whether a new maximum was found in the Forward Traversal phase | |
43 | */ | |
44 | public static final String NEW_MAXIMUM = "scccompute.max"; | |
45 | ||
46 | /** | |
47 | * Flags whether a vertex converged in the Backward Traversal phase | |
48 | */ | |
49 | public static final String CONVERGED = "scccompute.converged"; | |
50 | ||
51 | /** | |
52 | * Enumerates the possible phases of the algorithm. | |
53 | */ | |
54 | 0 | public enum Phases { |
55 | /** Tranpose and Trimming phases **/ | |
56 | 0 | TRANSPOSE, TRIMMING, |
57 | /** Maximum id propagation **/ | |
58 | 0 | FORWARD_TRAVERSAL, |
59 | /** Vertex convergence in SCC **/ | |
60 | 0 | BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST |
61 | }; | |
62 | ||
63 | @Override | |
64 | public void initialize() throws InstantiationException, | |
65 | IllegalAccessException { | |
66 | 0 | registerPersistentAggregator(PHASE, IntOverwriteAggregator.class); |
67 | 0 | registerAggregator(NEW_MAXIMUM, BooleanOrAggregator.class); |
68 | 0 | registerAggregator(CONVERGED, BooleanOrAggregator.class); |
69 | 0 | } |
70 | ||
71 | @Override | |
72 | public void compute() { | |
73 | 0 | if (getSuperstep() == 0) { |
74 | 0 | setPhase(Phases.TRANSPOSE); |
75 | } else { | |
76 | 0 | Phases currPhase = getPhase(); |
77 | 0 | switch (currPhase) { |
78 | case TRANSPOSE: | |
79 | 0 | setPhase(Phases.TRIMMING); |
80 | 0 | break; |
81 | case TRIMMING : | |
82 | 0 | setPhase(Phases.FORWARD_TRAVERSAL); |
83 | 0 | break; |
84 | case FORWARD_TRAVERSAL : | |
85 | 0 | BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM); |
86 | // If no new maximum value was found it means the propagation | |
87 | // converged, so we can move to the next phase | |
88 | 0 | if (!newMaxFound.get()) { |
89 | 0 | setPhase(Phases.BACKWARD_TRAVERSAL_START); |
90 | } | |
91 | break; | |
92 | case BACKWARD_TRAVERSAL_START : | |
93 | 0 | setPhase(Phases.BACKWARD_TRAVERSAL_REST); |
94 | 0 | break; |
95 | case BACKWARD_TRAVERSAL_REST : | |
96 | 0 | BooleanWritable converged = getAggregatedValue(CONVERGED); |
97 | 0 | if (!converged.get()) { |
98 | 0 | setPhase(Phases.TRANSPOSE); |
99 | } | |
100 | break; | |
101 | default : | |
102 | break; | |
103 | } | |
104 | } | |
105 | 0 | } |
106 | ||
107 | /** | |
108 | * Sets the next phase of the algorithm. | |
109 | * @param phase | |
110 | * Next phase. | |
111 | */ | |
112 | private void setPhase(Phases phase) { | |
113 | 0 | setAggregatedValue(PHASE, new IntWritable(phase.ordinal())); |
114 | 0 | } |
115 | ||
116 | /** | |
117 | * Get current phase. | |
118 | * @return Current phase as enumerator. | |
119 | */ | |
120 | private Phases getPhase() { | |
121 | 0 | IntWritable phaseInt = getAggregatedValue(PHASE); |
122 | 0 | return getPhase(phaseInt); |
123 | } | |
124 | ||
125 | /** | |
126 | * Helper function to convert from internal aggregated value to a Phases | |
127 | * enumerator. | |
128 | * @param phaseInt | |
129 | * An integer that matches a position in the Phases enumerator. | |
130 | * @return A Phases' item for the given position. | |
131 | */ | |
132 | public static Phases getPhase(IntWritable phaseInt) { | |
133 | 0 | return Phases.values()[phaseInt.get()]; |
134 | } | |
135 | ||
136 | } |