Coverage Report - org.apache.giraph.examples.SimpleVertexWithWorkerContext
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleVertexWithWorkerContext
0%
0/18
0%
0/4
2.8
SimpleVertexWithWorkerContext$EmitterWorkerContext
0%
0/34
0%
0/8
2.8
SimpleVertexWithWorkerContext$SimpleComputation
0%
0/8
0%
0/2
2.8
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import org.apache.giraph.graph.BasicComputation;
 22  
 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
 23  
 import org.apache.giraph.graph.Vertex;
 24  
 import org.apache.giraph.job.GiraphJob;
 25  
 import org.apache.giraph.worker.WorkerContext;
 26  
 import org.apache.hadoop.conf.Configuration;
 27  
 import org.apache.hadoop.fs.FileSystem;
 28  
 import org.apache.hadoop.fs.Path;
 29  
 import org.apache.hadoop.io.DoubleWritable;
 30  
 import org.apache.hadoop.io.FloatWritable;
 31  
 import org.apache.hadoop.io.IntWritable;
 32  
 import org.apache.hadoop.io.LongWritable;
 33  
 import org.apache.hadoop.mapreduce.Mapper.Context;
 34  
 import org.apache.hadoop.util.Tool;
 35  
 import org.apache.hadoop.util.ToolRunner;
 36  
 
 37  
 import java.io.DataOutputStream;
 38  
 import java.io.IOException;
 39  
 
 40  
 /**
 41  
  * Fully runnable example of how to
 42  
  * emit worker data to HDFS during a graph
 43  
  * computation.
 44  
  */
 45  0
 public class SimpleVertexWithWorkerContext implements Tool {
 46  
   /** Directory name of where to write. */
 47  
   public static final String OUTPUTDIR = "svwwc.outputdir";
 48  
   /** Halting condition for the number of supersteps */
 49  
   private static final int TESTLENGTH = 30;
 50  
   /** Configuration */
 51  
   private Configuration conf;
 52  
 
 53  
   @Override
 54  
   public void setConf(Configuration conf) {
 55  0
     this.conf = conf;
 56  0
   }
 57  
 
 58  
   @Override
 59  
   public Configuration getConf() {
 60  0
     return conf;
 61  
   }
 62  
 
 63  
   /**
 64  
    * Actual vetex implementation
 65  
    */
 66  0
   public static class SimpleComputation extends BasicComputation<LongWritable,
 67  
       IntWritable, FloatWritable, DoubleWritable> {
 68  
     @Override
 69  
     public void compute(
 70  
         Vertex<LongWritable, IntWritable, FloatWritable> vertex,
 71  
         Iterable<DoubleWritable> messages) throws IOException {
 72  
 
 73  0
       long superstep = getSuperstep();
 74  
 
 75  0
       if (superstep < TESTLENGTH) {
 76  0
         EmitterWorkerContext emitter = getWorkerContext();
 77  0
         emitter.emit("vertexId=" + vertex.getId() +
 78  
             " superstep=" + superstep + "\n");
 79  0
       } else {
 80  0
         vertex.voteToHalt();
 81  
       }
 82  0
     }
 83  
   }
 84  
 
 85  
   /**
 86  
    * Example worker context to emit data as part of a superstep.
 87  
    */
 88  
   @SuppressWarnings("rawtypes")
 89  0
   public static class EmitterWorkerContext extends WorkerContext {
 90  
     /** File name prefix */
 91  
     private static final String FILENAME = "emitter_";
 92  
     /** Output stream to dump the strings. */
 93  
     private DataOutputStream out;
 94  
 
 95  
     @Override
 96  
     public void preApplication() {
 97  0
       Context context = getContext();
 98  
       FileSystem fs;
 99  
 
 100  
       try {
 101  0
         fs = FileSystem.get(context.getConfiguration());
 102  
 
 103  0
         String p = context.getConfiguration()
 104  0
             .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
 105  0
         if (p == null) {
 106  0
           throw new IllegalArgumentException(
 107  
               SimpleVertexWithWorkerContext.OUTPUTDIR +
 108  
               " undefined!");
 109  
         }
 110  
 
 111  0
         Path path = new Path(p);
 112  0
         if (!fs.exists(path)) {
 113  0
           throw new IllegalArgumentException(path +
 114  
               " doesn't exist");
 115  
         }
 116  
 
 117  0
         Path outF = new Path(path, FILENAME +
 118  0
             context.getTaskAttemptID());
 119  0
         if (fs.exists(outF)) {
 120  0
           throw new IllegalArgumentException(outF +
 121  
               " aready exists");
 122  
         }
 123  
 
 124  0
         out = fs.create(outF);
 125  0
       } catch (IOException e) {
 126  0
         throw new RuntimeException(
 127  
             "can't initialize WorkerContext", e);
 128  0
       }
 129  0
     }
 130  
 
 131  
     @Override
 132  
     public void postApplication() {
 133  0
       if (out != null) {
 134  
         try {
 135  0
           out.flush();
 136  0
           out.close();
 137  0
         } catch (IOException e) {
 138  0
           throw new RuntimeException(
 139  
               "can't finalize WorkerContext", e);
 140  0
         }
 141  0
         out = null;
 142  
       }
 143  0
     }
 144  
 
 145  
     @Override
 146  0
     public void preSuperstep() { }
 147  
 
 148  
     @Override
 149  0
     public void postSuperstep() { }
 150  
 
 151  
     /**
 152  
      * Write this string to the output stream.
 153  
      *
 154  
      * @param s String to dump.
 155  
      */
 156  
     public void emit(String s) {
 157  
       try {
 158  0
         out.writeUTF(s);
 159  0
       } catch (IOException e) {
 160  0
         throw new RuntimeException("can't emit", e);
 161  0
       }
 162  0
     }
 163  
   }
 164  
 
 165  
   @Override
 166  
   public int run(String[] args) throws Exception {
 167  0
     if (args.length != 2) {
 168  0
       throw new IllegalArgumentException(
 169  
           "run: Must have 2 arguments <output path> <# of workers>");
 170  
     }
 171  0
     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
 172  0
     job.getConfiguration().setComputationClass(SimpleComputation.class);
 173  0
     job.getConfiguration().setVertexInputFormatClass(
 174  
         SimpleSuperstepVertexInputFormat.class);
 175  0
     job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
 176  0
     job.getConfiguration().set(
 177  
         SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
 178  0
     job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
 179  0
         Integer.parseInt(args[1]),
 180  
         100.0f);
 181  0
     if (job.run(true)) {
 182  0
       return 0;
 183  
     } else {
 184  0
       return -1;
 185  
     }
 186  
   }
 187  
 
 188  
   /**
 189  
    * Executable from the command line.
 190  
    *
 191  
    * @param args Command line arguments.
 192  
    * @throws Exception
 193  
    */
 194  
   public static void main(String[] args) throws Exception {
 195  0
     System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
 196  0
   }
 197  
 }