Coverage Report - org.apache.giraph.examples.block_app.SimpleMigrationMasterBlockFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleMigrationMasterBlockFactory
0%
0/6
N/A
1.143
SimpleMigrationMasterBlockFactory$SimpleMigrationMasterComputation
0%
0/11
0%
0/2
1.143
SimpleMigrationMasterBlockFactory$SimpleMigrationMasterCompute
0%
0/8
0%
0/2
1.143
SimpleMigrationMasterBlockFactory$SimpleMigrationMasterWorkerContext
0%
0/8
N/A
1.143
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.examples.block_app;
 19  
 
 20  
 import java.io.IOException;
 21  
 
 22  
 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
 23  
 import org.apache.giraph.block_app.framework.block.Block;
 24  
 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation;
 25  
 import org.apache.giraph.block_app.migration.MigrationFullBlockFactory;
 26  
 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
 27  
 import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
 28  
 import org.apache.giraph.conf.GiraphConfiguration;
 29  
 import org.apache.giraph.graph.Vertex;
 30  
 import org.apache.hadoop.io.DoubleWritable;
 31  
 import org.apache.hadoop.io.FloatWritable;
 32  
 import org.apache.hadoop.io.LongWritable;
 33  
 import org.apache.log4j.Logger;
 34  
 
 35  
 /**
 36  
  * Demonstrates using migration library for Blocks Framework,
 37  
  * as an drop-in replacement, without any changes.
 38  
  */
 39  0
 public class SimpleMigrationMasterBlockFactory
 40  
     extends MigrationFullBlockFactory {
 41  
   @Override
 42  
   public Block createBlock(GiraphConfiguration conf) {
 43  0
     return createMigrationAppBlock(
 44  
         SimpleMigrationMasterComputation.class,
 45  
         new SimpleMigrationMasterCompute(),
 46  
         DoubleWritable.class,
 47  
         null,
 48  
         conf);
 49  
   }
 50  
 
 51  
   @Override
 52  
   protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
 53  0
     return LongWritable.class;
 54  
   }
 55  
 
 56  
   @Override
 57  
   protected Class<DoubleWritable> getVertexValueClass(
 58  
       GiraphConfiguration conf) {
 59  0
     return DoubleWritable.class;
 60  
   }
 61  
 
 62  
   @Override
 63  
   protected Class<FloatWritable> getEdgeValueClass(GiraphConfiguration conf) {
 64  0
     return FloatWritable.class;
 65  
   }
 66  
 
 67  
   @Override
 68  
   protected
 69  
   Class<SimpleMigrationMasterWorkerContext> getWorkerContextValueClass(
 70  
       GiraphConfiguration conf) {
 71  0
     return SimpleMigrationMasterWorkerContext.class;
 72  
   }
 73  
 
 74  
   // Full copy from org.apache.giraph.examples.SimpleMasterComputeComputation
 75  
   // Just extending MigrationFull drop-in replacements instead.
 76  
 
 77  
   /**
 78  
    * Demonstrates a computation with a centralized part implemented via a
 79  
    * MasterCompute.
 80  
    */
 81  0
   public static class SimpleMigrationMasterComputation
 82  
       extends MigrationFullBasicComputation<LongWritable, DoubleWritable,
 83  
       FloatWritable, DoubleWritable> {
 84  
     /** Aggregator to get values from the master to the workers */
 85  
     public static final String SMC_AGG = "simplemastercompute.aggregator";
 86  
 
 87  
     /** Logger */
 88  0
     private static final Logger LOG =
 89  0
         Logger.getLogger(SimpleMigrationMasterComputation.class);
 90  
 
 91  
     @Override
 92  
     public void compute(
 93  
         Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
 94  
         Iterable<DoubleWritable> messages) throws IOException {
 95  0
       double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
 96  0
       double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
 97  0
       double newSum = oldSum + newValue;
 98  0
       vertex.setValue(new DoubleWritable(newSum));
 99  0
       SimpleMigrationMasterWorkerContext workerContext = getWorkerContext();
 100  0
       workerContext.setFinalSum(newSum);
 101  0
       LOG.info("Current sum: " + newSum);
 102  0
     }
 103  
   }
 104  
 
 105  
   /**
 106  
    * Worker context used with {@link SimpleMigrationMasterComputation}.
 107  
    */
 108  0
   public static class SimpleMigrationMasterWorkerContext
 109  
       extends MigrationFullWorkerContext {
 110  
     /** Final sum value for verification for local jobs */
 111  
     private static double FINAL_SUM;
 112  
 
 113  
     @Override
 114  
     public void preApplication()
 115  
       throws InstantiationException, IllegalAccessException {
 116  0
     }
 117  
 
 118  
     @Override
 119  
     public void preSuperstep() {
 120  0
     }
 121  
 
 122  
     @Override
 123  
     public void postSuperstep() {
 124  0
     }
 125  
 
 126  
     @Override
 127  
     public void postApplication() {
 128  0
     }
 129  
 
 130  
     public static void setFinalSum(double sum) {
 131  0
       FINAL_SUM = sum;
 132  0
     }
 133  
 
 134  
     public static double getFinalSum() {
 135  0
       return FINAL_SUM;
 136  
     }
 137  
   }
 138  
 
 139  
   /**
 140  
    * MasterCompute used with {@link SimpleMigrationMasterComputation}.
 141  
    */
 142  0
   public static class SimpleMigrationMasterCompute
 143  
       extends MigrationFullMasterCompute {
 144  
     @Override
 145  
     public void compute() {
 146  0
       setAggregatedValue(SimpleMigrationMasterComputation.SMC_AGG,
 147  0
           new DoubleWritable(((double) getSuperstep()) / 2 + 1));
 148  0
       if (getSuperstep() == 10) {
 149  0
         haltComputation();
 150  
       }
 151  0
     }
 152  
 
 153  
     @Override
 154  
     public void initialize() throws InstantiationException,
 155  
         IllegalAccessException {
 156  0
       registerAggregator(SimpleMigrationMasterComputation.SMC_AGG,
 157  
           DoubleOverwriteAggregator.class);
 158  0
     }
 159  
   }
 160  
 }