Coverage Report - org.apache.giraph.graph.MasterThread
 
Classes in this File Line Coverage Branch Coverage Complexity
MasterThread
83%
52/62
64%
18/28
11
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.graph;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.Map;
 23  
 import java.util.Map.Entry;
 24  
 import java.util.TreeMap;
 25  
 
 26  
 import org.apache.giraph.bsp.ApplicationState;
 27  
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 28  
 import org.apache.giraph.bsp.SuperstepState;
 29  
 import org.apache.hadoop.io.Writable;
 30  
 import org.apache.hadoop.io.WritableComparable;
 31  
 import org.apache.hadoop.mapreduce.Mapper.Context;
 32  
 import org.apache.log4j.Logger;
 33  
 import org.apache.zookeeper.KeeperException;
 34  
 
 35  
 /**
 36  
  * Master thread that will coordinate the activities of the tasks.  It runs
 37  
  * on all task processes, however, will only execute its algorithm if it knows
 38  
  * it is the "leader" from ZooKeeper.
 39  
  *
 40  
  * @param <I> Vertex id
 41  
  * @param <V> Vertex value
 42  
  * @param <E> Edge value
 43  
  * @param <M> Message data
 44  
  */
 45  
 @SuppressWarnings("rawtypes")
 46  
 public class MasterThread<I extends WritableComparable, V extends Writable,
 47  
     E extends Writable, M extends Writable> extends Thread {
 48  
   /** Counter group name for the Giraph timers */
 49  
   public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
 50  
   /** Class logger */
 51  1
   private static final Logger LOG = Logger.getLogger(MasterThread.class);
 52  
   /** Reference to shared BspService */
 53  24
   private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
 54  
   /** Context (for counters) */
 55  
   private final Context context;
 56  
   /** Use superstep counters? */
 57  
   private final boolean superstepCounterOn;
 58  
   /** Setup seconds */
 59  24
   private double setupSecs = 0d;
 60  
   /** Superstep timer (in seconds) map */
 61  24
   private final Map<Long, Double> superstepSecsMap =
 62  
       new TreeMap<Long, Double>();
 63  
 
 64  
   /**
 65  
    * Constructor.
 66  
    *
 67  
    * @param bspServiceMaster Master that already exists and setup() has
 68  
    *        been called.
 69  
    * @param context Context from the Mapper.
 70  
    */
 71  
   MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
 72  
       Context context) {
 73  24
     super(MasterThread.class.getName());
 74  24
     this.bspServiceMaster = bspServiceMaster;
 75  24
     this.context = context;
 76  24
     superstepCounterOn = context.getConfiguration().getBoolean(
 77  
         GiraphJob.USE_SUPERSTEP_COUNTERS,
 78  
         GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
 79  24
   }
 80  
 
 81  
   /**
 82  
    * The master algorithm.  The algorithm should be able to withstand
 83  
    * failures and resume as necessary since the master may switch during a
 84  
    * job.
 85  
    */
 86  
   @Override
 87  
   public void run() {
 88  
     // Algorithm:
 89  
     // 1. Become the master
 90  
     // 2. If desired, restart from a manual checkpoint
 91  
     // 3. Run all supersteps until complete
 92  
     try {
 93  24
       long startMillis = System.currentTimeMillis();
 94  24
       long endMillis = 0;
 95  24
       bspServiceMaster.setup();
 96  24
       if (bspServiceMaster.becomeMaster()) {
 97  
         // Attempt to create InputSplits if necessary. Bail out if that fails.
 98  24
         if (bspServiceMaster.getRestartedSuperstep() !=
 99  
             BspService.UNSET_SUPERSTEP ||
 100  
             bspServiceMaster.createInputSplits() != -1) {
 101  24
           long setupMillis = System.currentTimeMillis() - startMillis;
 102  24
           context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
 103  
               "Setup (milliseconds)").
 104  
               increment(setupMillis);
 105  24
           setupSecs = setupMillis / 1000.0d;
 106  24
           SuperstepState superstepState = SuperstepState.INITIAL;
 107  24
           long cachedSuperstep = BspService.UNSET_SUPERSTEP;
 108  242
           while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
 109  218
             long startSuperstepMillis = System.currentTimeMillis();
 110  218
             cachedSuperstep = bspServiceMaster.getSuperstep();
 111  218
             superstepState = bspServiceMaster.coordinateSuperstep();
 112  218
             long superstepMillis = System.currentTimeMillis() -
 113  
                 startSuperstepMillis;
 114  218
             superstepSecsMap.put(Long.valueOf(cachedSuperstep),
 115  
                 superstepMillis / 1000.0d);
 116  218
             if (LOG.isInfoEnabled()) {
 117  218
               LOG.info("masterThread: Coordination of superstep " +
 118  
                   cachedSuperstep + " took " +
 119  
                   superstepMillis / 1000.0d +
 120  
                   " seconds ended with state " + superstepState +
 121  
                   " and is now on superstep " +
 122  
                   bspServiceMaster.getSuperstep());
 123  
             }
 124  218
             if (superstepCounterOn) {
 125  
               String counterPrefix;
 126  218
               if (cachedSuperstep == -1) {
 127  24
                 counterPrefix = "Vertex input superstep";
 128  
               } else {
 129  194
                 counterPrefix = "Superstep " + cachedSuperstep;
 130  
               }
 131  218
               context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
 132  
                   counterPrefix +
 133  
                   " (milliseconds)").
 134  
                   increment(superstepMillis);
 135  
             }
 136  
 
 137  
             // If a worker failed, restart from a known good superstep
 138  218
             if (superstepState == SuperstepState.WORKER_FAILURE) {
 139  0
               bspServiceMaster.restartFromCheckpoint(
 140  
                   bspServiceMaster.getLastGoodCheckpoint());
 141  
             }
 142  218
             endMillis = System.currentTimeMillis();
 143  218
           }
 144  24
           bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
 145  
         }
 146  
       }
 147  24
       bspServiceMaster.cleanup();
 148  24
       if (!superstepSecsMap.isEmpty()) {
 149  24
         context.getCounter(
 150  
             GIRAPH_TIMERS_COUNTER_GROUP_NAME,
 151  
             "Shutdown (milliseconds)").
 152  
             increment(System.currentTimeMillis() - endMillis);
 153  24
         if (LOG.isInfoEnabled()) {
 154  24
           LOG.info("setup: Took " + setupSecs + " seconds.");
 155  
         }
 156  24
         for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
 157  218
           if (LOG.isInfoEnabled()) {
 158  218
             if (entry.getKey().longValue() ==
 159  
                 BspService.INPUT_SUPERSTEP) {
 160  24
               LOG.info("vertex input superstep: Took " +
 161  
                   entry.getValue() + " seconds.");
 162  
             } else {
 163  194
               LOG.info("superstep " + entry.getKey() + ": Took " +
 164  
                   entry.getValue() + " seconds.");
 165  
             }
 166  
           }
 167  
         }
 168  24
         if (LOG.isInfoEnabled()) {
 169  24
           LOG.info("shutdown: Took " +
 170  
               (System.currentTimeMillis() - endMillis) /
 171  
               1000.0d + " seconds.");
 172  24
           LOG.info("total: Took " +
 173  
               ((System.currentTimeMillis() - startMillis) /
 174  
               1000.0d) + " seconds.");
 175  
         }
 176  24
         context.getCounter(
 177  
             GIRAPH_TIMERS_COUNTER_GROUP_NAME,
 178  
             "Total (milliseconds)").
 179  
             increment(System.currentTimeMillis() - startMillis);
 180  
       }
 181  0
     } catch (IOException e) {
 182  0
       LOG.error("masterThread: Master algorithm failed with " +
 183  
           "IOException ", e);
 184  0
       throw new IllegalStateException(e);
 185  0
     } catch (InterruptedException e) {
 186  0
       LOG.error("masterThread: Master algorithm failed with " +
 187  
           "InterruptedException", e);
 188  0
       throw new IllegalStateException(e);
 189  0
     } catch (KeeperException e) {
 190  0
       LOG.error("masterThread: Master algorithm failed with " +
 191  
           "KeeperException", e);
 192  0
       throw new IllegalStateException(e);
 193  24
     }
 194  24
   }
 195  
 }