Coverage Report - org.apache.giraph.examples.SimpleCheckpointVertex
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleCheckpointVertex
41%
32/77
25%
7/28
2.538
SimpleCheckpointVertex$SimpleCheckpointVertexMasterCompute
100%
3/3
N/A
2.538
SimpleCheckpointVertex$SimpleCheckpointVertexWorkerContext
100%
14/14
N/A
2.538
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.commons.cli.CommandLine;
 22  
 import org.apache.commons.cli.CommandLineParser;
 23  
 import org.apache.commons.cli.HelpFormatter;
 24  
 import org.apache.commons.cli.Options;
 25  
 import org.apache.commons.cli.PosixParser;
 26  
 import org.apache.giraph.aggregators.LongSumAggregator;
 27  
 import org.apache.giraph.graph.DefaultMasterCompute;
 28  
 import org.apache.giraph.graph.Edge;
 29  
 import org.apache.giraph.graph.EdgeListVertex;
 30  
 import org.apache.giraph.graph.GiraphJob;
 31  
 import org.apache.giraph.graph.WorkerContext;
 32  
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 33  
 import org.apache.giraph.io.IdWithValueTextOutputFormat;
 34  
 import org.apache.hadoop.conf.Configuration;
 35  
 import org.apache.hadoop.fs.Path;
 36  
 import org.apache.hadoop.io.FloatWritable;
 37  
 import org.apache.hadoop.io.IntWritable;
 38  
 import org.apache.hadoop.io.LongWritable;
 39  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 40  
 import org.apache.hadoop.util.Tool;
 41  
 import org.apache.hadoop.util.ToolRunner;
 42  
 import org.apache.log4j.Logger;
 43  
 
 44  
 /**
 45  
  * An example that simply uses its id, value, and edges to compute new data
 46  
  * every iteration to verify that checkpoint restarting works.  Fault injection
 47  
  * can also test automated checkpoint restarts.
 48  
  */
 49  42
 public class SimpleCheckpointVertex extends
 50  
     EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
 51  
     implements Tool {
 52  
   /** Which superstep to cause the worker to fail */
 53  
   public static final int FAULTING_SUPERSTEP = 4;
 54  
   /** Vertex id to fault on */
 55  
   public static final long FAULTING_VERTEX_ID = 1;
 56  
   /** Dynamically set number of supersteps */
 57  
   public static final String SUPERSTEP_COUNT =
 58  
       "simpleCheckpointVertex.superstepCount";
 59  
   /** Should fault? */
 60  
   public static final String ENABLE_FAULT =
 61  
       "simpleCheckpointVertex.enableFault";
 62  
   /** Class logger */
 63  1
   private static final Logger LOG =
 64  
       Logger.getLogger(SimpleCheckpointVertex.class);
 65  
   /** Configuration */
 66  
   private Configuration conf;
 67  
 
 68  
   @Override
 69  
   public void compute(Iterable<FloatWritable> messages) {
 70  280
     SimpleCheckpointVertexWorkerContext workerContext =
 71  
         (SimpleCheckpointVertexWorkerContext) getWorkerContext();
 72  
 
 73  280
     boolean enableFault = workerContext.getEnableFault();
 74  280
     int supersteps = workerContext.getSupersteps();
 75  
 
 76  280
     if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
 77  
         (getContext().getTaskAttemptID().getId() == 0) &&
 78  
         (getId().get() == FAULTING_VERTEX_ID)) {
 79  0
       LOG.info("compute: Forced a fault on the first " +
 80  
           "attempt of superstep " +
 81  
           FAULTING_SUPERSTEP + " and vertex id " +
 82  
           FAULTING_VERTEX_ID);
 83  0
       System.exit(-1);
 84  
     }
 85  280
     if (getSuperstep() > supersteps) {
 86  35
       voteToHalt();
 87  35
       return;
 88  
     }
 89  245
     long sumAgg = this.<LongWritable>getAggregatedValue(
 90  
         LongSumAggregator.class.getName()).get();
 91  245
     LOG.info("compute: " + sumAgg);
 92  245
     aggregate(LongSumAggregator.class.getName(),
 93  
         new LongWritable(getId().get()));
 94  245
     LOG.info("compute: sum = " + sumAgg +
 95  
         " for vertex " + getId());
 96  245
     float msgValue = 0.0f;
 97  245
     for (FloatWritable message : messages) {
 98  210
       float curMsgValue = message.get();
 99  210
       msgValue += curMsgValue;
 100  210
       LOG.info("compute: got msgValue = " + curMsgValue +
 101  
           " for vertex " + getId() +
 102  
           " on superstep " + getSuperstep());
 103  210
     }
 104  245
     int vertexValue = getValue().get();
 105  245
     setValue(new IntWritable(vertexValue + (int) msgValue));
 106  245
     LOG.info("compute: vertex " + getId() +
 107  
         " has value " + getValue() +
 108  
         " on superstep " + getSuperstep());
 109  245
     for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
 110  245
       FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
 111  
           (float) vertexValue);
 112  245
       LOG.info("compute: vertex " + getId() +
 113  
           " sending edgeValue " + edge.getValue() +
 114  
           " vertexValue " + vertexValue +
 115  
           " total " + newEdgeValue +
 116  
               " to vertex " + edge.getTargetVertexId() +
 117  
               " on superstep " + getSuperstep());
 118  245
       addEdge(edge.getTargetVertexId(), newEdgeValue);
 119  245
       sendMessage(edge.getTargetVertexId(), newEdgeValue);
 120  245
     }
 121  245
   }
 122  
 
 123  
   /**
 124  
    * Worker context associated with {@link SimpleCheckpointVertex}.
 125  
    */
 126  7
   public static class SimpleCheckpointVertexWorkerContext
 127  
       extends WorkerContext {
 128  
     /** Filename to indicate whether a fault was found */
 129  
     public static final String FAULT_FILE = "/tmp/faultFile";
 130  
     /** User can access this after the application finishes if local */
 131  
     private static long FINAL_SUM;
 132  
     /** Number of supersteps to run (6 by default) */
 133  7
     private int supersteps = 6;
 134  
     /** Enable the fault at the particular vertex id and superstep? */
 135  7
     private boolean enableFault = false;
 136  
 
 137  
     public static long getFinalSum() {
 138  2
       return FINAL_SUM;
 139  
     }
 140  
 
 141  
     @Override
 142  
     public void preApplication()
 143  
       throws InstantiationException, IllegalAccessException {
 144  7
       supersteps = getContext().getConfiguration()
 145  
           .getInt(SUPERSTEP_COUNT, supersteps);
 146  7
       enableFault = getContext().getConfiguration()
 147  
           .getBoolean(ENABLE_FAULT, false);
 148  7
     }
 149  
 
 150  
     @Override
 151  
     public void postApplication() {
 152  7
       FINAL_SUM = this.<LongWritable>getAggregatedValue(
 153  
           LongSumAggregator.class.getName()).get();
 154  7
       LOG.info("FINAL_SUM=" + FINAL_SUM);
 155  7
     }
 156  
 
 157  
     @Override
 158  
     public void preSuperstep() {
 159  56
     }
 160  
 
 161  
     @Override
 162  56
     public void postSuperstep() { }
 163  
 
 164  
     public int getSupersteps() {
 165  280
       return this.supersteps;
 166  
     }
 167  
 
 168  
     public boolean getEnableFault() {
 169  280
       return this.enableFault;
 170  
     }
 171  
   }
 172  
 
 173  
   @Override
 174  
   public int run(String[] args) throws Exception {
 175  0
     Options options = new Options();
 176  0
     options.addOption("h", "help", false, "Help");
 177  0
     options.addOption("v", "verbose", false, "Verbose");
 178  0
     options.addOption("w",
 179  
         "workers",
 180  
         true,
 181  
         "Number of workers");
 182  0
     options.addOption("s",
 183  
         "supersteps",
 184  
         true,
 185  
         "Supersteps to execute before finishing");
 186  0
     options.addOption("w",
 187  
         "workers",
 188  
         true,
 189  
         "Minimum number of workers");
 190  0
     options.addOption("o",
 191  
         "outputDirectory",
 192  
         true,
 193  
         "Output directory");
 194  0
     HelpFormatter formatter = new HelpFormatter();
 195  0
     if (args.length == 0) {
 196  0
       formatter.printHelp(getClass().getName(), options, true);
 197  0
       return 0;
 198  
     }
 199  0
     CommandLineParser parser = new PosixParser();
 200  0
     CommandLine cmd = parser.parse(options, args);
 201  0
     if (cmd.hasOption('h')) {
 202  0
       formatter.printHelp(getClass().getName(), options, true);
 203  0
       return 0;
 204  
     }
 205  0
     if (!cmd.hasOption('w')) {
 206  0
       LOG.info("Need to choose the number of workers (-w)");
 207  0
       return -1;
 208  
     }
 209  0
     if (!cmd.hasOption('o')) {
 210  0
       LOG.info("Need to set the output directory (-o)");
 211  0
       return -1;
 212  
     }
 213  
 
 214  0
     GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
 215  0
     bspJob.setVertexClass(getClass());
 216  0
     bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
 217  0
     bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
 218  0
     bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
 219  0
     bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
 220  0
     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
 221  0
     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
 222  0
     bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
 223  
 
 224  0
     FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
 225  
                                    new Path(cmd.getOptionValue('o')));
 226  0
     boolean verbose = false;
 227  0
     if (cmd.hasOption('v')) {
 228  0
       verbose = true;
 229  
     }
 230  0
     if (cmd.hasOption('s')) {
 231  0
       getConf().setInt(SUPERSTEP_COUNT,
 232  
           Integer.parseInt(cmd.getOptionValue('s')));
 233  
     }
 234  0
     if (bspJob.run(verbose)) {
 235  0
       return 0;
 236  
     } else {
 237  0
       return -1;
 238  
     }
 239  
   }
 240  
 
 241  
   /**
 242  
    * Master compute associated with {@link SimpleCheckpointVertex}.
 243  
    * It registers required aggregators.
 244  
    */
 245  35
   public static class SimpleCheckpointVertexMasterCompute extends
 246  
       DefaultMasterCompute {
 247  
     @Override
 248  
     public void initialize() throws InstantiationException,
 249  
         IllegalAccessException {
 250  7
       registerAggregator(LongSumAggregator.class.getName(),
 251  
           LongSumAggregator.class);
 252  7
     }
 253  
   }
 254  
 
 255  
   /**
 256  
    * Executable from the command line.
 257  
    *
 258  
    * @param args Command line args.
 259  
    * @throws Exception
 260  
    */
 261  
   public static void main(String[] args) throws Exception {
 262  0
     System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
 263  0
   }
 264  
 
 265  
   @Override
 266  
   public Configuration getConf() {
 267  0
     return conf;
 268  
   }
 269  
 
 270  
   @Override
 271  
   public void setConf(Configuration conf) {
 272  35
     this.conf = conf;
 273  35
   }
 274  
 }