Coverage Report - org.apache.giraph.comm.ResponseClientHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
ResponseClientHandler
74%
23/31
50%
5/10
3.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm;
 20  
 
 21  
 import java.io.IOException;
 22  
 import java.util.concurrent.ConcurrentMap;
 23  
 
 24  
 import org.apache.log4j.Logger;
 25  
 import org.jboss.netty.buffer.ChannelBuffer;
 26  
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 27  
 import org.jboss.netty.channel.ChannelHandlerContext;
 28  
 import org.jboss.netty.channel.ChannelStateEvent;
 29  
 import org.jboss.netty.channel.ExceptionEvent;
 30  
 import org.jboss.netty.channel.MessageEvent;
 31  
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 32  
 
 33  
 /**
 34  
  * Generic handler of responses.
 35  
  */
 36  
 public class ResponseClientHandler extends SimpleChannelUpstreamHandler {
 37  
   /** Class logger */
 38  1
   private static final Logger LOG =
 39  
       Logger.getLogger(ResponseClientHandler.class);
 40  
   /** Outstanding request map */
 41  
   private final ConcurrentMap<Long, RequestInfo> outstandingRequestMap;
 42  
 
 43  
   /**
 44  
    * Constructor.
 45  
    *
 46  
    * @param outstandingRequestMap Map of outstanding requests
 47  
    */
 48  
   public ResponseClientHandler(
 49  11
       ConcurrentMap<Long, RequestInfo> outstandingRequestMap) {
 50  11
     this.outstandingRequestMap = outstandingRequestMap;
 51  11
   }
 52  
 
 53  
   @Override
 54  
   public void messageReceived(
 55  
       ChannelHandlerContext ctx, MessageEvent event) {
 56  3
     if (!(event.getMessage() instanceof ChannelBuffer)) {
 57  0
       throw new IllegalStateException("messageReceived: Got a " +
 58  
           "non-ChannelBuffer message " + event.getMessage());
 59  
     }
 60  3
     ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
 61  3
     ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer);
 62  3
     long requestId = -1;
 63  3
     int response = -1;
 64  
     try {
 65  3
       requestId = inputStream.readLong();
 66  3
       response = inputStream.readByte();
 67  3
       inputStream.close();
 68  0
     } catch (IOException e) {
 69  0
       throw new IllegalStateException(
 70  
           "messageReceived: Got IOException ", e);
 71  3
     }
 72  3
     if (response != 0) {
 73  0
       throw new IllegalStateException(
 74  
           "messageReceived: Got illegal response " + response);
 75  
     }
 76  
 
 77  3
     RequestInfo requestInfo = outstandingRequestMap.remove(requestId);
 78  3
     if (requestInfo == null) {
 79  0
       throw new IllegalStateException("messageReceived: Impossible to " +
 80  
           "have a non-registered requestId " + requestId);
 81  
     } else {
 82  3
       if (LOG.isDebugEnabled()) {
 83  0
         LOG.debug("messageReceived: Processed request id = " + requestId +
 84  
             " " + requestInfo + ".  Waiting on " +
 85  
             outstandingRequestMap.size() +
 86  
             " requests, bytes = " + buffer.capacity());
 87  
       }
 88  
     }
 89  
 
 90  
     // Help NettyClient#waitSomeRequests() to finish faster
 91  3
     synchronized (outstandingRequestMap) {
 92  3
       outstandingRequestMap.notifyAll();
 93  3
     }
 94  3
   }
 95  
 
 96  
   @Override
 97  
   public void channelClosed(ChannelHandlerContext ctx,
 98  
                             ChannelStateEvent e) throws Exception {
 99  11
     if (LOG.isDebugEnabled()) {
 100  0
       LOG.debug("channelClosed: Closed the channel on " +
 101  
           ctx.getChannel().getRemoteAddress());
 102  
     }
 103  11
   }
 104  
 
 105  
   @Override
 106  
   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
 107  0
     throw new IllegalStateException("exceptionCaught: Channel failed with " +
 108  
         "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
 109  
   }
 110  
 }