This page last changed on Jul 29, 2009 by aidan.

Notes:

Objectives

  1. Implement ProtocolEngine and NetworkDriver from New common network and protocol interfaces
  2. Isolate MINA dependencies into MINANetworkDriver
  3. No changes to threading model
  4. No changes to configuration file

Overview of new implementation

This first phase confines the MINA dependency behind the NetworkDriver interface. As can be seen by comparison from the following diagram describing the 0.5 implementation, the changes to the flow of control are not significantly different. The only significant change is to decouple the protocol processing from the network processing.

Work required

Add QpidByteBuffer

Class Method Change
org.apache.mina.common.ByteBuffer all At a number of points in the code we rely on behaviour present in MINA ByteBuffers but not java.nio.ByteBuffers, in particular the auto-expanding functionality. To avoid having to rewrite those parts at this stage we should import the MINA ByteBuffer class as QpidByteBuffer and continue to use it as is. Note that this functionality is not related to the network buffers, the QpidByteBuffers are the outputted data structure. The existing implementations will have their MINA or java.nio.ByteBuffers converted to QpidByteBuffers by the ProtocolEngine after decoding

Implement NetworkDriver

MINANetworkDriver class

/***
 * This class wraps the existing MINA implementation behind the NetworkDriver interface. This uses
 * the AsyncWritePoolingFilter to buffer network writes so that send() is not synchronous. 
 */
public class MINANetworkDriver implements NetworkDriver extends IoHandlerAdapter
{ 
   /**
    * Creates a concrete NetworkDriver which opens a connection to the specified address on the port
    * and starts the given ProtocolEngine instance using MINA
    */
   static NetworkDriver open (int port, InetAddress destination, ProtocolEngine enginer, 
                  NetworkDriverConfiguration config);  
  
   /**
    * Listens for incoming connections on the specified port and IPaddresses. Creates a new concrete
    * NetworkDriver for each incoming connection and creates a ProtocolEngine with the given factory
    */
   static NetworkDriver bind (int port, InetAddress[] addresses, ProtocolEngineFactory factory,  
                  NetworkDriverConfiguration config);  
 
   void SocketAddress getRemoteAddress() 
 
   /**
    * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been 
    * read
    */
   void setMaxReadIdle(int idleTime) 

   /**
    * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been 
    * written
    */ 
   void setMaxWriteIdle(int idleTime) 
   
   /**
    * Adds the data to an Event through AsynchWritePoolingFilter for writing to MINA
    */
   void send(java.nio.ByteBuffer data)

}

Changes required to implement MINANetworkDriver

It replaces AMQPFastProtocolHandler and the majority of the implementation will be taken from that class. bind() will be taken from the existing org.apache.qpid.server.Main

Class Method Change
AMQPFastProtocolHandler general Renamed to MINANetworkDriver, implements NetworkDriver interface
AMQPFastProtocolHandler exceptionCaught AMQP functionality removed, passes exception onto ProtocolEngine
AMQPFastProtocolHandler messageRecieved calls ProtocolEngine.received()
AMQPFastProtocolHandler sessionClosed calls ProtocolEngine.closed()
AMQPFastProtocolHandler sessionCreated creates a new ProtocolEngine from ProtocolEngineFactory, other AMQP functionality moved to AMQProtocolEngine. Configuration taken from NetworkDriverConfiguration
AMQPFastProtocolHandler sessionIdle calls ProtocolEngine.readerIdle() or ProtocolEngine.writerIdle() as appropriate
AMQMinaProtocolSession AMQMinaProtocolSession Threading pool configuration moved to NetworkDriver
AMQMinaProtocolSession initHeartbets moved to NetworkDriver.setMaxReadIdle() and NetworkDriver .setMaxWriteIdle()
AMQMinaProtocolSession getRemoteAddress moved to NetworkDriver.getRemoteAddress
Main bind Moved to NetworkDriver
Main startup Network configuration moved to NetworkDriver, replaced with construction of NetworkDriver

Implementation of ProtocolEngine

/***
 * This class replaces the existing AMQMinaProtocolSession and implements the ProtocolEngine interface. It is 
 * responsible for processing bytes into frames and for processing those frames. It is also responsible for 
 * turning frames into bytes and passing them onto the NetworkDriver for writing. 
 */
public class AMQProtocolEngine implements ProtocolEngine
{ 
   /**
    * Processes data. It uses AMQDecoder to decode the frame and place it into a Job for later processing
    */
   void received(java.nio.ByteBuffer data)
   
   void setNetworkDriver (NetworkDriver driver) 
 
   /**
    * calls NetworkDriver.getRemoteAddress()
    */
   SocketAddress getRemoteAddress() 
 
   long getWrittenBytes() 
 
   long getReadBytes() 
 
   /**
    * Closes the protocol engine, aborts in-progress transactions etc. 
    */
   void closed()  
  
   /**
    * Generates a heartbeat frame
    */
   void writerIdle() 

   /**
    * Closes the connection
    */
   void readerIdle() 
 
} 

Changes required to implement AMQProtocolEngine

Class Method Change
AMQMinaProtocolSession Whole class Rename to AMQProtocolEngine. decouple the AMQP semantics from the underlying networking, for which it should now use MINANetworkDriver. This class will implement the ProtocolEngine interface and become the central point for processing AMQP frames. It will use a Job to hold RecievedEvents for processing outside of the network thread.
AMQMinaProtocolSession received implement and use AMQDecoder to decode the byte stream before placing onto Job for processing
AMQProtocolSessionMBean Needs updated to use AMQProtocolSession without relying on the underlying implementation
Event, Job Created by AMQProtocolEngine.recieved rather than AsynchPoolingReadFilter
AMQDecoder Should no longer extend CumulativeProtocolDecoder
AMQPFastProtocolHandler   AMQP related functionality moved to AMQProtocolEngine
AMQPProtocolProvider   Removed
public class AMQProtocolEngineFactory implements ProtocolEngineFactory  
{ 
 
  // Returns a new instance of an AMQProtocolEngine 
  ProtocolEngine newProtocolEngine() 
   
} 

Changes required to implement AMQProtocolEngineFactory

No existing code needs to be changed here. The factory will simply create an AMQProtocolEngine and return it.

Implement NetworkConfiguration

Class Method Change
ServerConfiguration getNetworkConfiguration new method, needs to construct and return network configuration information

Transport layer selection in o.a.q.server.Main

Currently the broker's Main method contains setup code specific to MINA.

Main needs to be modified to remove the MINA specific option (NIO, MultiIO, executor pool etc) processing from there. This should be replaced with creation of a Network Driver instance with configuration being picked up & applied in the same way as other subsystems are configured ie. from ServerConfiguration.

Class Method Change
Main bind Moved to NetworkDriver
Main startup Network configuration moved to NetworkDriver, replaced with construction of NetworkDriver and calling MINANetworkDriver.bind

Document generated by Confluence on May 26, 2010 10:32