Coverage Report - org.apache.giraph.examples.RandomWalkWorkerContext
 
Classes in this File Line Coverage Branch Coverage Complexity
RandomWalkWorkerContext
0%
0/44
0%
0/12
2.1
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.examples;
 20  
 
 21  
 import java.io.BufferedReader;
 22  
 import java.io.IOException;
 23  
 import java.io.InputStreamReader;
 24  
 import java.nio.charset.Charset;
 25  
 import java.util.Set;
 26  
 
 27  
 import org.apache.giraph.worker.WorkerContext;
 28  
 import org.apache.hadoop.conf.Configuration;
 29  
 import org.apache.hadoop.filecache.DistributedCache;
 30  
 import org.apache.hadoop.fs.FileSystem;
 31  
 import org.apache.hadoop.fs.Path;
 32  
 import org.apache.log4j.Logger;
 33  
 
 34  
 import com.google.common.collect.ImmutableSet;
 35  
 
 36  
 /**
 37  
  * Worker context for random walks.
 38  
  */
 39  0
 public class RandomWalkWorkerContext extends WorkerContext {
 40  
   /** Default maximum number of iterations */
 41  
   private static final int DEFAULT_MAX_SUPERSTEPS = 30;
 42  
   /** Default teleportation probability */
 43  
   private static final float DEFAULT_TELEPORTATION_PROBABILITY = 0.15f;
 44  
   /** Maximum number of iterations */
 45  
   private static int MAX_SUPERSTEPS;
 46  
   /** Teleportation probability */
 47  
   private static double TELEPORTATION_PROBABILITY;
 48  
   /** Preference vector */
 49  
   private static Set<Long> SOURCES;
 50  
 
 51  
   /** Configuration parameter for the source vertex */
 52  0
   private static final String SOURCE_VERTEX =
 53  0
       RandomWalkWithRestartComputation.class.getName() + ".sourceVertex";
 54  
 
 55  
   /** Logger */
 56  0
   private static final Logger LOG = Logger
 57  0
       .getLogger(RandomWalkWorkerContext.class);
 58  
 
 59  
   /**
 60  
    * @return The maximum number of iterations to perform.
 61  
    */
 62  
   public int getMaxSupersteps() {
 63  0
     if (MAX_SUPERSTEPS == 0) {
 64  0
       throw new IllegalStateException(
 65  0
           RandomWalkWorkerContext.class.getSimpleName() +
 66  
               " was not initialized. Relaunch your job " +
 67  
               "by setting the appropriate WorkerContext");
 68  
     }
 69  0
     return MAX_SUPERSTEPS;
 70  
   }
 71  
 
 72  
   /**
 73  
    * @return The teleportation probability.
 74  
    */
 75  
   public double getTeleportationProbability() {
 76  0
     if (TELEPORTATION_PROBABILITY == 0) {
 77  0
       throw new IllegalStateException(
 78  0
           RandomWalkWorkerContext.class.getSimpleName() +
 79  
               " was not initialized. Relaunch your job " +
 80  
               "by setting the appropriate WorkerContext");
 81  
     }
 82  0
     return TELEPORTATION_PROBABILITY;
 83  
   }
 84  
 
 85  
   /**
 86  
    * Checks if a vertex is a source.
 87  
    * @param id The vertex ID to check.
 88  
    * @return True if the vertex is a source in the preference vector.
 89  
    */
 90  
   public boolean isSource(long id) {
 91  0
     return SOURCES.contains(id);
 92  
   }
 93  
 
 94  
   /**
 95  
    * @return The number of sources in the preference vector.
 96  
    */
 97  
   public int numSources() {
 98  0
     return SOURCES.size();
 99  
   }
 100  
 
 101  
   /**
 102  
    * Initialize sources for Random Walk with Restart. First option
 103  
    * (preferential) is single source given from the command line as a parameter.
 104  
    * Second option is a file with a list of vertex IDs, one per line. In this
 105  
    * second case the preference vector is a uniform distribution over these
 106  
    * vertexes.
 107  
    * @param configuration The configuration.
 108  
    * @return a (possibly empty) set of source vertices
 109  
    */
 110  
   private ImmutableSet<Long> initializeSources(Configuration configuration) {
 111  0
     ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
 112  0
     long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
 113  0
     if (sourceVertex != Long.MIN_VALUE) {
 114  0
       return ImmutableSet.of(sourceVertex);
 115  
     } else {
 116  0
       Path sourceFile = null;
 117  
       try {
 118  
 
 119  0
         Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration);
 120  0
         if (cacheFiles == null || cacheFiles.length == 0) {
 121  
           // empty set if no source vertices configured
 122  0
           return ImmutableSet.of();
 123  
         }
 124  
 
 125  0
         sourceFile = cacheFiles[0];
 126  0
         FileSystem fs = FileSystem.getLocal(configuration);
 127  0
         BufferedReader in = new BufferedReader(new InputStreamReader(
 128  0
             fs.open(sourceFile), Charset.defaultCharset()));
 129  
         String line;
 130  0
         while ((line = in.readLine()) != null) {
 131  0
           builder.add(Long.parseLong(line));
 132  
         }
 133  0
         in.close();
 134  0
       } catch (IOException e) {
 135  0
         getContext().setStatus(
 136  
             "Could not load local cache files: " + sourceFile);
 137  0
         LOG.error("Could not load local cache files: " + sourceFile, e);
 138  0
       }
 139  
     }
 140  0
     return builder.build();
 141  
   }
 142  
 
 143  
   @Override
 144  
   public void preApplication() throws InstantiationException,
 145  
       IllegalAccessException {
 146  0
     setStaticVars(getContext().getConfiguration());
 147  0
   }
 148  
 
 149  
   /**
 150  
    * Set static variables from Configuration
 151  
    *
 152  
    * @param configuration the conf
 153  
    */
 154  
   private void setStaticVars(Configuration configuration) {
 155  0
     MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS,
 156  
         DEFAULT_MAX_SUPERSTEPS);
 157  0
     TELEPORTATION_PROBABILITY = configuration.getFloat(
 158  
         RandomWalkComputation.TELEPORTATION_PROBABILITY,
 159  
         DEFAULT_TELEPORTATION_PROBABILITY);
 160  0
     SOURCES = initializeSources(configuration);
 161  0
   }
 162  
 
 163  
   @Override
 164  
   public void preSuperstep() {
 165  0
   }
 166  
 
 167  
   @Override
 168  
   public void postSuperstep() {
 169  0
   }
 170  
 
 171  
   @Override
 172  
   public void postApplication() {
 173  0
   }
 174  
 }