Coverage Report - org.apache.giraph.examples.SimpleMasterComputeComputation
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleMasterComputeComputation
0%
0/11
0%
0/2
1.222
SimpleMasterComputeComputation$SimpleMasterCompute
0%
0/8
0%
0/2
1.222
SimpleMasterComputeComputation$SimpleMasterComputeWorkerContext
0%
0/8
N/A
1.222
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
 22  
 import org.apache.giraph.graph.BasicComputation;
 23  
 import org.apache.giraph.master.DefaultMasterCompute;
 24  
 import org.apache.giraph.graph.Vertex;
 25  
 import org.apache.giraph.worker.WorkerContext;
 26  
 import org.apache.hadoop.io.DoubleWritable;
 27  
 import org.apache.hadoop.io.FloatWritable;
 28  
 import org.apache.hadoop.io.LongWritable;
 29  
 import org.apache.log4j.Logger;
 30  
 
 31  
 import java.io.IOException;
 32  
 
 33  
 /**
 34  
  * Demonstrates a computation with a centralized part implemented via a
 35  
  * MasterCompute.
 36  
  */
 37  0
 public class SimpleMasterComputeComputation extends BasicComputation<
 38  
     LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
 39  
   /** Aggregator to get values from the master to the workers */
 40  
   public static final String SMC_AGG = "simplemastercompute.aggregator";
 41  
   /** Logger */
 42  0
   private static final Logger LOG =
 43  0
       Logger.getLogger(SimpleMasterComputeComputation.class);
 44  
 
 45  
   @Override
 46  
   public void compute(
 47  
       Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
 48  
       Iterable<DoubleWritable> messages) throws IOException {
 49  0
     double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
 50  0
     double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
 51  0
     double newSum = oldSum + newValue;
 52  0
     vertex.setValue(new DoubleWritable(newSum));
 53  0
     SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
 54  0
     workerContext.setFinalSum(newSum);
 55  0
     LOG.info("Current sum: " + newSum);
 56  0
   }
 57  
 
 58  
   /**
 59  
    * Worker context used with {@link SimpleMasterComputeComputation}.
 60  
    */
 61  0
   public static class SimpleMasterComputeWorkerContext
 62  
       extends WorkerContext {
 63  
     /** Final sum value for verification for local jobs */
 64  
     private static double FINAL_SUM;
 65  
 
 66  
     @Override
 67  
     public void preApplication()
 68  
       throws InstantiationException, IllegalAccessException {
 69  0
     }
 70  
 
 71  
     @Override
 72  
     public void preSuperstep() {
 73  0
     }
 74  
 
 75  
     @Override
 76  
     public void postSuperstep() {
 77  0
     }
 78  
 
 79  
     @Override
 80  
     public void postApplication() {
 81  0
     }
 82  
 
 83  
     public static void setFinalSum(double sum) {
 84  0
       FINAL_SUM = sum;
 85  0
     }
 86  
 
 87  
     public static double getFinalSum() {
 88  0
       return FINAL_SUM;
 89  
     }
 90  
   }
 91  
 
 92  
   /**
 93  
    * MasterCompute used with {@link SimpleMasterComputeComputation}.
 94  
    */
 95  0
   public static class SimpleMasterCompute
 96  
       extends DefaultMasterCompute {
 97  
     @Override
 98  
     public void compute() {
 99  0
       setAggregatedValue(SMC_AGG,
 100  0
           new DoubleWritable(((double) getSuperstep()) / 2 + 1));
 101  0
       if (getSuperstep() == 10) {
 102  0
         haltComputation();
 103  
       }
 104  0
     }
 105  
 
 106  
     @Override
 107  
     public void initialize() throws InstantiationException,
 108  
         IllegalAccessException {
 109  0
       registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
 110  0
     }
 111  
   }
 112  
 }