Coverage Report - org.apache.giraph.comm.RequestDecoder
 
Classes in this File Line Coverage Branch Coverage Complexity
RequestDecoder
81%
18/22
37%
3/8
3.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import org.apache.hadoop.conf.Configuration;
 22  
 import org.apache.hadoop.io.Writable;
 23  
 import org.apache.hadoop.io.WritableComparable;
 24  
 import org.apache.hadoop.util.ReflectionUtils;
 25  
 import org.apache.log4j.Logger;
 26  
 import org.jboss.netty.buffer.ChannelBuffer;
 27  
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 28  
 import org.jboss.netty.channel.Channel;
 29  
 import org.jboss.netty.channel.ChannelHandlerContext;
 30  
 import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
 31  
 
 32  
 /**
 33  
  * Decodes encoded requests from the client.
 34  
  *
 35  
  * @param <I> Vertex id
 36  
  * @param <V> Vertex data
 37  
  * @param <E> Edge data
 38  
  * @param <M> Message data
 39  
  */
 40  
 @SuppressWarnings("rawtypes")
 41  
 public class RequestDecoder<I extends WritableComparable,
 42  
     V extends Writable, E extends Writable,
 43  
     M extends Writable>  extends OneToOneDecoder {
 44  
   /** Class logger */
 45  1
   private static final Logger LOG =
 46  
       Logger.getLogger(RequestDecoder.class);
 47  
   /** Configuration */
 48  
   private final Configuration conf;
 49  
   /** Registry of requests */
 50  
   private final RequestRegistry requestRegistry;
 51  
   /** Byte counter to output */
 52  
   private final ByteCounter byteCounter;
 53  
 
 54  
   /**
 55  
    * Constructor.
 56  
    *
 57  
    * @param conf Configuration
 58  
    * @param requestRegistry Request registry
 59  
    * @param byteCounter Keeps track of the decoded bytes
 60  
    */
 61  
   public RequestDecoder(Configuration conf, RequestRegistry requestRegistry,
 62  11
                         ByteCounter byteCounter) {
 63  11
     this.conf = conf;
 64  11
     this.requestRegistry = requestRegistry;
 65  11
     this.byteCounter = byteCounter;
 66  11
   }
 67  
 
 68  
   @Override
 69  
   protected Object decode(ChannelHandlerContext ctx,
 70  
       Channel channel, Object msg) throws Exception {
 71  3
     if (!(msg instanceof ChannelBuffer)) {
 72  0
       throw new IllegalStateException("decode: Got illegal message " + msg);
 73  
     }
 74  
 
 75  
     // Output metrics every 1/2 minute
 76  3
     String metrics = byteCounter.getMetricsWindow(30000);
 77  3
     if (metrics != null) {
 78  0
       if (LOG.isInfoEnabled()) {
 79  0
         LOG.info("decode: Server window metrics " + metrics);
 80  
       }
 81  
     }
 82  
 
 83  3
     ChannelBuffer buffer = (ChannelBuffer) msg;
 84  3
     ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
 85  3
     int enumValue = inputStream.readByte();
 86  3
     RequestRegistry.Type type = RequestRegistry.Type.values()[enumValue];
 87  3
     if (LOG.isDebugEnabled()) {
 88  0
       LOG.debug("decode: Got a request of type " + type);
 89  
     }
 90  
     @SuppressWarnings("unchecked")
 91  3
     Class<? extends WritableRequest<I, V, E, M>> writableRequestClass =
 92  
         (Class<? extends WritableRequest<I, V, E, M>>)
 93  
         requestRegistry.getClass(type);
 94  3
     WritableRequest<I, V, E, M> writableRequest =
 95  
         ReflectionUtils.newInstance(writableRequestClass, conf);
 96  3
     writableRequest.readFields(inputStream);
 97  3
     return writableRequest;
 98  
   }
 99  
 }