Coverage Report - org.apache.giraph.block_app.migration.MigrationMasterCompute
 
Classes in this File Line Coverage Branch Coverage Complexity
MigrationMasterCompute
0%
0/25
N/A
1.5
MigrationMasterCompute$MigrationFullMasterCompute
0%
0/47
0%
0/20
1.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.migration;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 
 24  
 import org.apache.giraph.aggregators.Aggregator;
 25  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 26  
 import org.apache.giraph.block_app.framework.api.StatusReporter;
 27  
 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
 28  
 import org.apache.giraph.combiner.MessageCombiner;
 29  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 30  
 import org.apache.giraph.conf.TypesHolder;
 31  
 import org.apache.giraph.reducers.ReduceOperation;
 32  
 import org.apache.giraph.utils.ReflectionUtils;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 
 35  
 /**
 36  
  * Replacement for MasterCompute when migrating to Blocks Framework,
 37  
  * disallowing functions that are tied to execution order.
 38  
  */
 39  
 @SuppressWarnings({"unchecked", "rawtypes"})
 40  0
 public abstract class MigrationMasterCompute
 41  
     extends DefaultImmutableClassesGiraphConfigurable implements Writable {
 42  
   private BlockMasterApi api;
 43  
 
 44  
   final void init(BlockMasterApi masterApi) {
 45  0
     this.api = masterApi;
 46  0
     setConf(masterApi.getConf());
 47  0
   }
 48  
 
 49  
   @Override
 50  
   public void readFields(DataInput in) throws IOException {
 51  0
   }
 52  
 
 53  
   @Override
 54  
   public void write(DataOutput out) throws IOException {
 55  0
   }
 56  
 
 57  
   public void compute() {
 58  0
   }
 59  
 
 60  
   public void initialize() throws InstantiationException,
 61  
       IllegalAccessException {
 62  0
   }
 63  
 
 64  
   @SuppressWarnings("deprecation")
 65  
   public long getTotalNumVertices() {
 66  0
     return api.getTotalNumVertices();
 67  
   }
 68  
 
 69  
   @SuppressWarnings("deprecation")
 70  
   public long getTotalNumEdges() {
 71  0
     return api.getTotalNumEdges();
 72  
   }
 73  
 
 74  
   public final <S, R extends Writable> void registerReducer(
 75  
       String name, ReduceOperation<S, R> reduceOp) {
 76  0
     api.registerReducer(name, reduceOp);
 77  0
   }
 78  
 
 79  
   public final <S, R extends Writable> void registerReducer(
 80  
       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
 81  0
     api.registerReducer(
 82  
         name, reduceOp, globalInitialValue);
 83  0
   }
 84  
 
 85  
   public final <T extends Writable> T getReduced(String name) {
 86  0
     return api.getReduced(name);
 87  
   }
 88  
 
 89  
   public final void broadcast(String name, Writable object) {
 90  0
     api.broadcast(name, object);
 91  0
   }
 92  
 
 93  
   public final <A extends Writable> boolean registerAggregator(
 94  
     String name, Class<? extends Aggregator<A>> aggregatorClass)
 95  
     throws InstantiationException, IllegalAccessException {
 96  0
     return api.registerAggregator(
 97  
         name, aggregatorClass);
 98  
   }
 99  
 
 100  
   @SuppressWarnings("deprecation")
 101  
   public final <A extends Writable> boolean registerPersistentAggregator(
 102  
       String name,
 103  
       Class<? extends Aggregator<A>> aggregatorClass) throws
 104  
       InstantiationException, IllegalAccessException {
 105  0
     return api.registerPersistentAggregator(name, aggregatorClass);
 106  
   }
 107  
 
 108  
   public final <A extends Writable> A getAggregatedValue(String name) {
 109  0
     return api.<A>getAggregatedValue(name);
 110  
   }
 111  
 
 112  
   public final <A extends Writable> void setAggregatedValue(
 113  
       String name, A value) {
 114  0
     api.setAggregatedValue(name, value);
 115  0
   }
 116  
 
 117  
   public final void logToCommandLine(String line) {
 118  0
     api.logToCommandLine(line);
 119  0
   }
 120  
 
 121  
   public final StatusReporter getContext() {
 122  0
     return api;
 123  
   }
 124  
 
 125  
   /**
 126  
    * Drop-in replacement for MasterCompute when migrating
 127  
    * to Blocks Framework.
 128  
    */
 129  0
   public static class MigrationFullMasterCompute
 130  
       extends MigrationMasterCompute {
 131  
     private long superstep;
 132  
     private boolean halt;
 133  
     private Class<? extends MigrationAbstractComputation> computationClass;
 134  
     private Class<? extends MigrationAbstractComputation> newComputationClass;
 135  
     private Class<? extends Writable> originalMessage;
 136  
     private Class<? extends Writable> newMessage;
 137  
     private Class<? extends MessageCombiner> originalMessageCombiner;
 138  
     private Class<? extends MessageCombiner> newMessageCombiner;
 139  
 
 140  
     final void init(
 141  
         long superstep,
 142  
         Class<? extends MigrationAbstractComputation> computationClass,
 143  
         Class<? extends Writable> message,
 144  
         Class<? extends MessageCombiner> messageCombiner) {
 145  0
       this.superstep = superstep;
 146  0
       this.halt = false;
 147  0
       this.computationClass = computationClass;
 148  0
       this.newComputationClass = null;
 149  0
       this.originalMessage = message;
 150  0
       this.newMessage = null;
 151  0
       this.originalMessageCombiner = messageCombiner;
 152  0
       this.newMessageCombiner = null;
 153  0
     }
 154  
 
 155  
     public final long getSuperstep() {
 156  0
       return superstep;
 157  
     }
 158  
 
 159  
     @Override
 160  
     public final long getTotalNumVertices() {
 161  0
       if (superstep == 0) {
 162  0
         throw new RuntimeException(
 163  
             "getTotalNumVertices not available in superstep=0");
 164  
       }
 165  0
       return super.getTotalNumVertices();
 166  
     }
 167  
 
 168  
     @Override
 169  
     public final long getTotalNumEdges() {
 170  0
       if (superstep == 0) {
 171  0
         throw new RuntimeException(
 172  
             "getTotalNumEdges not available in superstep=0");
 173  
       }
 174  0
       return super.getTotalNumEdges();
 175  
     }
 176  
 
 177  
 
 178  
     public final void haltComputation() {
 179  0
       halt = true;
 180  0
     }
 181  
 
 182  
     public final boolean isHalted() {
 183  0
       return halt;
 184  
     }
 185  
 
 186  
     public final void setComputation(
 187  
         Class<? extends MigrationFullAbstractComputation> computation) {
 188  0
       if (computation != null) {
 189  0
         newComputationClass = computation;
 190  
       } else {
 191  
         // TODO
 192  0
         this.computationClass = null;
 193  
       }
 194  0
     }
 195  
 
 196  
     public final
 197  
     Class<? extends MigrationAbstractComputation> getComputation() {
 198  0
       if (newComputationClass != null) {
 199  0
         return newComputationClass;
 200  
       }
 201  0
       if (computationClass != null) {
 202  0
         return computationClass;
 203  
       }
 204  0
       return null;
 205  
     }
 206  
 
 207  
     public final void setMessageCombiner(
 208  
         Class<? extends MessageCombiner> combinerClass) {
 209  0
       this.newMessageCombiner = combinerClass;
 210  0
     }
 211  
 
 212  
     public final Class<? extends MessageCombiner> getMessageCombiner() {
 213  0
       return newMessageCombiner != null ?
 214  
         newMessageCombiner : originalMessageCombiner;
 215  
     }
 216  
 
 217  
     public final void setIncomingMessage(
 218  
         Class<? extends Writable> incomingMessageClass) {
 219  0
       if (!originalMessage.equals(incomingMessageClass)) {
 220  0
         throw new IllegalArgumentException(
 221  
             originalMessage + " and " + incomingMessageClass + " must be same");
 222  
       }
 223  0
     }
 224  
 
 225  
     public final void setOutgoingMessage(
 226  
         Class<? extends Writable> outgoingMessageClass) {
 227  0
       newMessage = outgoingMessageClass;
 228  0
     }
 229  
 
 230  
     final Class<? extends Writable> getOutgoingMessage() {
 231  0
       if (newMessage != null) {
 232  0
         return newMessage;
 233  
       }
 234  
 
 235  0
       if (newComputationClass == null) {
 236  0
         return originalMessage;
 237  
       }
 238  0
       Class[] computationTypes = ReflectionUtils.getTypeArguments(
 239  
           TypesHolder.class, newComputationClass);
 240  0
       return computationTypes[4];
 241  
     }
 242  
 
 243  
     final Class<? extends MigrationAbstractComputation> getComputationClass() {
 244  0
       return newComputationClass != null ?
 245  
         newComputationClass : computationClass;
 246  
     }
 247  
 
 248  
     final
 249  
     Class<? extends MigrationAbstractComputation> getNewComputationClass() {
 250  0
       return newComputationClass;
 251  
     }
 252  
 
 253  
     final Class<? extends Writable> getNewMessage() {
 254  0
       return newMessage;
 255  
     }
 256  
 
 257  
     final Class<? extends MessageCombiner> getNewMessageCombiner() {
 258  0
       return newMessageCombiner;
 259  
     }
 260  
   }
 261  
 }