Coverage Report - org.apache.giraph.examples.SimpleCheckpoint
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleCheckpoint
0%
0/50
0%
0/14
2.429
SimpleCheckpoint$SimpleCheckpointComputation
0%
0/44
0%
0/14
2.429
SimpleCheckpoint$SimpleCheckpointVertexMasterCompute
0%
0/3
N/A
2.429
SimpleCheckpoint$SimpleCheckpointVertexWorkerContext
0%
0/19
N/A
2.429
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.commons.cli.CommandLine;
 22  
 import org.apache.commons.cli.CommandLineParser;
 23  
 import org.apache.commons.cli.HelpFormatter;
 24  
 import org.apache.commons.cli.Options;
 25  
 import org.apache.commons.cli.PosixParser;
 26  
 import org.apache.giraph.aggregators.LongSumAggregator;
 27  
 import org.apache.giraph.graph.BasicComputation;
 28  
 import org.apache.giraph.edge.Edge;
 29  
 import org.apache.giraph.edge.EdgeFactory;
 30  
 import org.apache.giraph.io.formats.FileOutputFormatUtil;
 31  
 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
 32  
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 33  
 import org.apache.giraph.job.GiraphJob;
 34  
 import org.apache.giraph.master.DefaultMasterCompute;
 35  
 import org.apache.giraph.graph.Vertex;
 36  
 import org.apache.giraph.worker.WorkerContext;
 37  
 import org.apache.hadoop.conf.Configuration;
 38  
 import org.apache.hadoop.fs.Path;
 39  
 import org.apache.hadoop.io.FloatWritable;
 40  
 import org.apache.hadoop.io.IntWritable;
 41  
 import org.apache.hadoop.io.LongWritable;
 42  
 import org.apache.hadoop.util.Tool;
 43  
 import org.apache.hadoop.util.ToolRunner;
 44  
 import org.apache.log4j.Logger;
 45  
 
 46  
 import java.io.IOException;
 47  
 
 48  
 /**
 49  
  * An example that simply uses its id, value, and edges to compute new data
 50  
  * every iteration to verify that checkpoint restarting works.  Fault injection
 51  
  * can also test automated checkpoint restarts.
 52  
  */
 53  0
 public class SimpleCheckpoint implements Tool {
 54  
   /** Which superstep to cause the worker to fail */
 55  
   public static final int FAULTING_SUPERSTEP = 4;
 56  
   /** Vertex id to fault on */
 57  
   public static final long FAULTING_VERTEX_ID = 1;
 58  
   /** Dynamically set number of supersteps */
 59  
   public static final String SUPERSTEP_COUNT =
 60  
       "simpleCheckpointVertex.superstepCount";
 61  
   /** Should fault? */
 62  
   public static final String ENABLE_FAULT =
 63  
       "simpleCheckpointVertex.enableFault";
 64  
   /** Class logger */
 65  0
   private static final Logger LOG =
 66  0
       Logger.getLogger(SimpleCheckpoint.class);
 67  
   /** Configuration */
 68  
   private Configuration conf;
 69  
 
 70  
   /**
 71  
    * Actual computation.
 72  
    */
 73  0
   public static class SimpleCheckpointComputation extends
 74  
       BasicComputation<LongWritable, IntWritable, FloatWritable,
 75  
           FloatWritable> {
 76  
     @Override
 77  
     public void compute(
 78  
         Vertex<LongWritable, IntWritable, FloatWritable> vertex,
 79  
         Iterable<FloatWritable> messages) throws IOException {
 80  0
       SimpleCheckpointVertexWorkerContext workerContext = getWorkerContext();
 81  
 
 82  0
       boolean enableFault = workerContext.getEnableFault();
 83  0
       int supersteps = workerContext.getSupersteps();
 84  
 
 85  0
       if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
 86  0
           (getContext().getTaskAttemptID().getId() == 0) &&
 87  0
           (vertex.getId().get() == FAULTING_VERTEX_ID)) {
 88  0
         LOG.info("compute: Forced a fault on the first " +
 89  
             "attempt of superstep " +
 90  
             FAULTING_SUPERSTEP + " and vertex id " +
 91  
             FAULTING_VERTEX_ID);
 92  0
         System.exit(-1);
 93  
       }
 94  0
       if (getSuperstep() > supersteps) {
 95  0
         vertex.voteToHalt();
 96  0
         return;
 97  
       }
 98  0
       long sumAgg = this.<LongWritable>getAggregatedValue(
 99  0
           LongSumAggregator.class.getName()).get();
 100  0
       LOG.info("compute: " + sumAgg);
 101  0
       aggregate(LongSumAggregator.class.getName(),
 102  0
           new LongWritable(vertex.getId().get()));
 103  0
       LOG.info("compute: sum = " + sumAgg +
 104  0
           " for vertex " + vertex.getId());
 105  0
       float msgValue = 0.0f;
 106  0
       for (FloatWritable message : messages) {
 107  0
         float curMsgValue = message.get();
 108  0
         msgValue += curMsgValue;
 109  0
         LOG.info("compute: got msgValue = " + curMsgValue +
 110  0
             " for vertex " + vertex.getId() +
 111  0
             " on superstep " + getSuperstep());
 112  0
       }
 113  0
       int vertexValue = vertex.getValue().get();
 114  0
       vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
 115  0
       LOG.info("compute: vertex " + vertex.getId() +
 116  0
           " has value " + vertex.getValue() +
 117  0
           " on superstep " + getSuperstep());
 118  0
       for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
 119  0
         FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
 120  
             (float) vertexValue);
 121  0
         Edge<LongWritable, FloatWritable> newEdge =
 122  0
             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
 123  0
         LOG.info("compute: vertex " + vertex.getId() +
 124  0
             " sending edgeValue " + edge.getValue() +
 125  
             " vertexValue " + vertexValue +
 126  
             " total " + newEdgeValue +
 127  0
             " to vertex " + edge.getTargetVertexId() +
 128  0
             " on superstep " + getSuperstep());
 129  0
         vertex.addEdge(newEdge);
 130  0
         sendMessage(edge.getTargetVertexId(), newEdgeValue);
 131  0
       }
 132  0
     }
 133  
   }
 134  
 
 135  
   /**
 136  
    * Worker context associated with {@link SimpleCheckpoint}.
 137  
    */
 138  0
   public static class SimpleCheckpointVertexWorkerContext
 139  
       extends WorkerContext {
 140  
     /** Filename to indicate whether a fault was found */
 141  
     public static final String FAULT_FILE = "/tmp/faultFile";
 142  
     /** User can access this after the application finishes if local */
 143  
     private static long FINAL_SUM;
 144  
     /** Number of supersteps to run (6 by default) */
 145  0
     private int supersteps = 6;
 146  
     /** Enable the fault at the particular vertex id and superstep? */
 147  0
     private boolean enableFault = false;
 148  
 
 149  
     public static long getFinalSum() {
 150  0
       return FINAL_SUM;
 151  
     }
 152  
 
 153  
     @Override
 154  
     public void preApplication()
 155  
       throws InstantiationException, IllegalAccessException {
 156  0
       supersteps = getContext().getConfiguration()
 157  0
           .getInt(SUPERSTEP_COUNT, supersteps);
 158  0
       enableFault = getContext().getConfiguration()
 159  0
           .getBoolean(ENABLE_FAULT, false);
 160  0
     }
 161  
 
 162  
     @Override
 163  
     public void postApplication() {
 164  0
       setFinalSum(this.<LongWritable>getAggregatedValue(
 165  0
           LongSumAggregator.class.getName()).get());
 166  0
       LOG.info("FINAL_SUM=" + FINAL_SUM);
 167  0
     }
 168  
 
 169  
     /**
 170  
      * Set the final sum
 171  
      *
 172  
      * @param value sum
 173  
      */
 174  
     private static void setFinalSum(long value) {
 175  0
       FINAL_SUM = value;
 176  0
     }
 177  
 
 178  
     @Override
 179  
     public void preSuperstep() {
 180  0
     }
 181  
 
 182  
     @Override
 183  0
     public void postSuperstep() { }
 184  
 
 185  
     public int getSupersteps() {
 186  0
       return this.supersteps;
 187  
     }
 188  
 
 189  
     public boolean getEnableFault() {
 190  0
       return this.enableFault;
 191  
     }
 192  
   }
 193  
 
 194  
   @Override
 195  
   public int run(String[] args) throws Exception {
 196  0
     Options options = new Options();
 197  0
     options.addOption("h", "help", false, "Help");
 198  0
     options.addOption("v", "verbose", false, "Verbose");
 199  0
     options.addOption("w",
 200  
         "workers",
 201  
         true,
 202  
         "Number of workers");
 203  0
     options.addOption("s",
 204  
         "supersteps",
 205  
         true,
 206  
         "Supersteps to execute before finishing");
 207  0
     options.addOption("w",
 208  
         "workers",
 209  
         true,
 210  
         "Minimum number of workers");
 211  0
     options.addOption("o",
 212  
         "outputDirectory",
 213  
         true,
 214  
         "Output directory");
 215  0
     HelpFormatter formatter = new HelpFormatter();
 216  0
     if (args.length == 0) {
 217  0
       formatter.printHelp(getClass().getName(), options, true);
 218  0
       return 0;
 219  
     }
 220  0
     CommandLineParser parser = new PosixParser();
 221  0
     CommandLine cmd = parser.parse(options, args);
 222  0
     if (cmd.hasOption('h')) {
 223  0
       formatter.printHelp(getClass().getName(), options, true);
 224  0
       return 0;
 225  
     }
 226  0
     if (!cmd.hasOption('w')) {
 227  0
       LOG.info("Need to choose the number of workers (-w)");
 228  0
       return -1;
 229  
     }
 230  0
     if (!cmd.hasOption('o')) {
 231  0
       LOG.info("Need to set the output directory (-o)");
 232  0
       return -1;
 233  
     }
 234  
 
 235  0
     GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
 236  0
     bspJob.getConfiguration().setComputationClass(
 237  
         SimpleCheckpointComputation.class);
 238  0
     bspJob.getConfiguration().setVertexInputFormatClass(
 239  
         GeneratedVertexInputFormat.class);
 240  0
     bspJob.getConfiguration().setVertexOutputFormatClass(
 241  
         IdWithValueTextOutputFormat.class);
 242  0
     bspJob.getConfiguration().setWorkerContextClass(
 243  
         SimpleCheckpointVertexWorkerContext.class);
 244  0
     bspJob.getConfiguration().setMasterComputeClass(
 245  
         SimpleCheckpointVertexMasterCompute.class);
 246  0
     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
 247  0
     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
 248  0
     bspJob.getConfiguration().setWorkerConfiguration(
 249  
         minWorkers, maxWorkers, 100.0f);
 250  
 
 251  0
     FileOutputFormatUtil.setOutputPath(bspJob.getInternalJob(),
 252  0
                                    new Path(cmd.getOptionValue('o')));
 253  0
     boolean verbose = false;
 254  0
     if (cmd.hasOption('v')) {
 255  0
       verbose = true;
 256  
     }
 257  0
     if (cmd.hasOption('s')) {
 258  0
       getConf().setInt(SUPERSTEP_COUNT,
 259  0
           Integer.parseInt(cmd.getOptionValue('s')));
 260  
     }
 261  0
     if (bspJob.run(verbose)) {
 262  0
       return 0;
 263  
     } else {
 264  0
       return -1;
 265  
     }
 266  
   }
 267  
 
 268  
   /**
 269  
    * Master compute associated with {@link SimpleCheckpoint}.
 270  
    * It registers required aggregators.
 271  
    */
 272  0
   public static class SimpleCheckpointVertexMasterCompute extends
 273  
       DefaultMasterCompute {
 274  
     @Override
 275  
     public void initialize() throws InstantiationException,
 276  
         IllegalAccessException {
 277  0
       registerAggregator(LongSumAggregator.class.getName(),
 278  
           LongSumAggregator.class);
 279  0
     }
 280  
   }
 281  
 
 282  
   /**
 283  
    * Executable from the command line.
 284  
    *
 285  
    * @param args Command line args.
 286  
    * @throws Exception
 287  
    */
 288  
   public static void main(String[] args) throws Exception {
 289  0
     System.exit(ToolRunner.run(new SimpleCheckpoint(), args));
 290  0
   }
 291  
 
 292  
   @Override
 293  
   public Configuration getConf() {
 294  0
     return conf;
 295  
   }
 296  
 
 297  
   @Override
 298  
   public void setConf(Configuration conf) {
 299  0
     this.conf = conf;
 300  0
   }
 301  
 }