Coverage Report - org.apache.giraph.utils.InternalVertexRunner
 
Classes in this File Line Coverage Branch Coverage Complexity
InternalVertexRunner
98%
50/51
100%
4/4
2
InternalVertexRunner$1
66%
4/6
N/A
2
InternalVertexRunner$InternalZooKeeper
100%
3/3
N/A
2
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import com.google.common.base.Charsets;
 22  
 import com.google.common.io.Files;
 23  
 import org.apache.giraph.graph.GiraphJob;
 24  
 import org.apache.hadoop.conf.Configuration;
 25  
 import org.apache.hadoop.fs.Path;
 26  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 27  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 28  
 import org.apache.zookeeper.server.ServerConfig;
 29  
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 30  
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 31  
 
 32  
 import java.io.File;
 33  
 import java.io.IOException;
 34  
 import java.util.Map;
 35  
 import java.util.Properties;
 36  
 import java.util.concurrent.ExecutorService;
 37  
 import java.util.concurrent.Executors;
 38  
 
 39  
 /**
 40  
  * A base class for running internal tests on a vertex
 41  
  *
 42  
  * Extending classes only have to invoke the run() method to test their vertex.
 43  
  * All data is written to a local tmp directory that is removed afterwards.
 44  
  * A local zookeeper instance is started in an extra thread and
 45  
  * shutdown at the end.
 46  
  *
 47  
  * Heavily inspired from Apache Mahout's MahoutTestCase
 48  
  */
 49  
 public class InternalVertexRunner {
 50  
   /** ZooKeeper port to use for tests */
 51  
   public static final int LOCAL_ZOOKEEPER_PORT = 22182;
 52  
 
 53  
   /**
 54  
    * Default constructor.
 55  
    */
 56  0
   private InternalVertexRunner() { }
 57  
 
 58  
   /**
 59  
    * Attempts to run the vertex internally in the current JVM, reading from
 60  
    * and writing to a temporary folder on local disk. Will start
 61  
    * its own ZooKeeper instance.
 62  
    *
 63  
    * @param vertexClass the vertex class to instantiate
 64  
    * @param vertexInputFormatClass the inputformat to use
 65  
    * @param vertexOutputFormatClass the outputformat to use
 66  
    * @param params a map of parameters to add to the hadoop configuration
 67  
    * @param data linewise input data
 68  
    * @return linewise output data
 69  
    * @throws Exception
 70  
    */
 71  
   public static Iterable<String> run(Class<?> vertexClass,
 72  
       Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
 73  
       Map<String, String> params, String... data) throws Exception {
 74  1
     return run(vertexClass, null, vertexInputFormatClass,
 75  
         vertexOutputFormatClass, params, data);
 76  
   }
 77  
 
 78  
   /**
 79  
    *  Attempts to run the vertex internally in the current JVM, reading from
 80  
    *  and writing to a temporary folder on local disk. Will start its own
 81  
    *  zookeeper instance.
 82  
    *
 83  
    * @param vertexClass the vertex class to instantiate
 84  
    * @param vertexCombinerClass the vertex combiner to use (or null)
 85  
    * @param vertexInputFormatClass the inputformat to use
 86  
    * @param vertexOutputFormatClass the outputformat to use
 87  
    * @param params a map of parameters to add to the hadoop configuration
 88  
    * @param data linewise input data
 89  
    * @return linewise output data
 90  
    * @throws Exception
 91  
    */
 92  
   public static Iterable<String> run(Class<?> vertexClass,
 93  
       Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
 94  
       Class<?> vertexOutputFormatClass, Map<String, String> params,
 95  
       String... data) throws Exception {
 96  
 
 97  3
     File tmpDir = null;
 98  
     try {
 99  
       // Prepare input file, output folder and temporary folders
 100  3
       tmpDir = FileUtils.createTestDir(vertexClass);
 101  3
       File inputFile = FileUtils.createTempFile(tmpDir, "graph.txt");
 102  3
       File outputDir = FileUtils.createTempDir(tmpDir, "output");
 103  3
       File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
 104  3
       File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
 105  3
       File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
 106  
 
 107  
       // Write input data to disk
 108  3
       FileUtils.writeLines(inputFile, data);
 109  
 
 110  
       // Create and configure the job to run the vertex
 111  3
       GiraphJob job = new GiraphJob(vertexClass.getName());
 112  3
       job.setVertexClass(vertexClass);
 113  3
       job.setVertexInputFormatClass(vertexInputFormatClass);
 114  3
       job.setVertexOutputFormatClass(vertexOutputFormatClass);
 115  
 
 116  3
       if (vertexCombinerClass != null) {
 117  2
         job.setVertexCombinerClass(vertexCombinerClass);
 118  
       }
 119  
 
 120  3
       job.setWorkerConfiguration(1, 1, 100.0f);
 121  3
       Configuration conf = job.getConfiguration();
 122  3
       conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
 123  3
       conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
 124  3
       conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
 125  
           String.valueOf(LOCAL_ZOOKEEPER_PORT));
 126  
 
 127  3
       conf.set(GiraphJob.ZOOKEEPER_DIR, zkDir.toString());
 128  3
       conf.set(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY,
 129  
           zkMgrDir.toString());
 130  3
       conf.set(GiraphJob.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
 131  
 
 132  3
       for (Map.Entry<String, String> param : params.entrySet()) {
 133  2
         conf.set(param.getKey(), param.getValue());
 134  
       }
 135  
 
 136  3
       FileInputFormat.addInputPath(job.getInternalJob(),
 137  
                                    new Path(inputFile.toString()));
 138  3
       FileOutputFormat.setOutputPath(job.getInternalJob(),
 139  
                                      new Path(outputDir.toString()));
 140  
 
 141  
       // Configure a local zookeeper instance
 142  3
       Properties zkProperties = new Properties();
 143  3
       zkProperties.setProperty("tickTime", "2000");
 144  3
       zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
 145  3
       zkProperties.setProperty("clientPort",
 146  
           String.valueOf(LOCAL_ZOOKEEPER_PORT));
 147  3
       zkProperties.setProperty("maxClientCnxns", "10000");
 148  3
       zkProperties.setProperty("minSessionTimeout", "10000");
 149  3
       zkProperties.setProperty("maxSessionTimeout", "100000");
 150  3
       zkProperties.setProperty("initLimit", "10");
 151  3
       zkProperties.setProperty("syncLimit", "5");
 152  3
       zkProperties.setProperty("snapCount", "50000");
 153  
 
 154  3
       QuorumPeerConfig qpConfig = new QuorumPeerConfig();
 155  3
       qpConfig.parseProperties(zkProperties);
 156  
 
 157  
       // Create and run the zookeeper instance
 158  3
       final InternalZooKeeper zookeeper = new InternalZooKeeper();
 159  3
       final ServerConfig zkConfig = new ServerConfig();
 160  3
       zkConfig.readFrom(qpConfig);
 161  
 
 162  3
       ExecutorService executorService = Executors.newSingleThreadExecutor();
 163  3
       executorService.execute(new Runnable() {
 164  
         @Override
 165  
         public void run() {
 166  
           try {
 167  3
             zookeeper.runFromConfig(zkConfig);
 168  0
           } catch (IOException e) {
 169  0
             throw new RuntimeException(e);
 170  3
           }
 171  3
         }
 172  
       });
 173  
       try {
 174  3
         job.run(true);
 175  
       } finally {
 176  3
         executorService.shutdown();
 177  3
         zookeeper.end();
 178  3
       }
 179  
 
 180  3
       return Files.readLines(new File(outputDir, "part-m-00000"),
 181  
           Charsets.UTF_8);
 182  
     } finally {
 183  3
       FileUtils.delete(tmpDir);
 184  
     }
 185  
   }
 186  
 
 187  
 
 188  
 
 189  
   /**
 190  
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
 191  
    */
 192  6
   private static class InternalZooKeeper extends ZooKeeperServerMain {
 193  
     /**
 194  
      * Shutdown the ZooKeeper instance.
 195  
      */
 196  
     void end() {
 197  3
       shutdown();
 198  3
     }
 199  
   }
 200  
 }