Coverage Report - org.apache.commons.messagelet.BridgeMDO
 
Classes in this File Line Coverage Branch Coverage Complexity
BridgeMDO
0%
0/135
0%
0/48
2.385
 
 1  
 /*
 2  
  * Copyright (C) The Apache Software Foundation. All rights reserved.
 3  
  *
 4  
  * This software is published under the terms of the Apache Software License
 5  
  * version 1.1, a copy of which has been included with this distribution in
 6  
  * the LICENSE file.
 7  
  * 
 8  
  * $Id: BridgeMDO.java 155459 2005-02-26 13:24:44Z dirkv $
 9  
  */
 10  
 package org.apache.commons.messagelet;
 11  
 
 12  
 import java.util.Enumeration;
 13  
 
 14  
 import javax.jms.BytesMessage;
 15  
 import javax.jms.Destination;
 16  
 import javax.jms.JMSException;
 17  
 import javax.jms.MapMessage;
 18  
 import javax.jms.Message;
 19  
 import javax.jms.ObjectMessage;
 20  
 import javax.jms.StreamMessage;
 21  
 import javax.jms.TextMessage;
 22  
 import javax.servlet.ServletException;
 23  
 
 24  
 import org.apache.commons.logging.Log;
 25  
 import org.apache.commons.logging.LogFactory;
 26  
 import org.apache.commons.messenger.Messenger;
 27  
 
 28  
 
 29  
 /** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge
 30  
  * from one JMS destination and connection to another.
 31  
  * This allows messages to be consumed on one destination and sent to 
 32  
  * another JMS destination, using possibly different JMS providers.
 33  
  * For example this can be used to bridge from SpiritWave to MQSeries. 
 34  
  * </p>
 35  
  * <p>
 36  
  * This class is a useful base class to other possible bridge implementations
 37  
  * such as 2 phase commit bridges or bridges with some complex transformation etc.
 38  
  * This class has a number of Factory and Strategy methods to further customize
 39  
  * the acknowledgement and transaction management, the message construction, 
 40  
  * transformation and how to handle message headers etc.
 41  
  * </p>
 42  
  *
 43  
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 44  
  * @version $Revision: 155459 $
 45  
  */
 46  
 public class BridgeMDO extends MessengerMDO {
 47  
 
 48  
     /** Logger */
 49  0
     private static final Log log = LogFactory.getLog(BridgeMDO.class);
 50  
   
 51  
     /** the Messenger used to output messages */
 52  
     private Messenger outputMessenger;
 53  
       
 54  
     /** the Destination output messages will be sent to */
 55  
     private Destination outputDestination;
 56  
 
 57  
     /** the name of the messenger to use for output */      
 58  
     private String outputConnection;
 59  
     
 60  
     /** the name of the destination to use */
 61  
     private String outputSubject;
 62  
     
 63  
     /** the buffer size used for ByteMessage and StreamMessage copying */
 64  0
     private int bufferSize = 32 * 1024;
 65  
 
 66  
     /** should this MDO work in transacted mode */
 67  0
     private boolean transacted = false;
 68  
     
 69  
     
 70  0
     public BridgeMDO() {
 71  0
     }
 72  
     
 73  
     public void init() throws ServletException {
 74  
         try {
 75  0
             Messenger messenger = getMessenger();
 76  0
             Messenger outputMessenger = getOutputMessenger();
 77  
             
 78  0
             if ( messenger == null ) {
 79  0
                 throw new ServletException( "No input Messenger is defined for this Bridge" );
 80  
             }
 81  0
             if ( outputMessenger == null ) {
 82  0
                 throw new ServletException( "No output Messenger is defined for this Bridge" );
 83  
             }
 84  
             
 85  
             // enable transacted mode 
 86  0
             boolean tran1 = messenger.getSessionFactory().isTransacted();
 87  0
             boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
 88  
             
 89  0
             if ( tran1 != tran2 ) {
 90  0
                 throw new ServletException( 
 91  
                     "Both the input and output Messenger must have the same transacted mode. "
 92  
                     + "Input is: " + tran1 + " output is: " + tran2 
 93  
                 );
 94  
             }
 95  0
             transacted = tran1;
 96  
             
 97  
             // use client acknowledgement
 98  
             
 99  
             // ### This should be specified in the Messenger.xml file
 100  
             //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
 101  
             //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
 102  
 
 103  0
             validateOutputDestination();
 104  
             
 105  
         }
 106  0
         catch (JMSException e) {
 107  0
             log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
 108  0
             log.error( "Caught: " + e, e);
 109  0
             throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
 110  0
         }
 111  0
     }
 112  
     
 113  
     // MessageListener interface
 114  
     //-------------------------------------------------------------------------
 115  
     public void onMessage(Message message) {
 116  0
         Messenger messenger = getMessenger();
 117  
         
 118  
         try {
 119  0
             Message outputMessage = createOutputMessage(message);
 120  0
             if ( outputMessage != null ) {
 121  0
                 Destination destination = getOutputDestination();
 122  
                 
 123  0
                 if ( log.isDebugEnabled() ) {
 124  0
                     log.debug( "Sending message to: " + destination );
 125  
                 }
 126  
                 
 127  0
                 getOutputMessenger().send( destination, outputMessage );
 128  
             }
 129  0
             acknowledge(message);
 130  0
             acknowledge(outputMessage);
 131  0
             commit();
 132  
         }
 133  0
         catch (Exception e) {
 134  0
             log.error("Could not send message due to exception", e);
 135  0
             rollback();
 136  0
         }
 137  0
     }
 138  
     
 139  
     
 140  
     // Properties
 141  
     //-------------------------------------------------------------------------
 142  
     
 143  
     /** 
 144  
      * @return true if this MDO should work in transacted mode
 145  
      */
 146  
     public boolean isTransacted() {
 147  0
         return transacted;
 148  
     }
 149  
 
 150  
     /**
 151  
      * Sets whether this MDO should work in transacted mode
 152  
      */    
 153  
     public void setTransacted(boolean transacted) {
 154  0
         this.transacted = transacted;
 155  0
     }
 156  
     
 157  
         
 158  
     public String getOutputConnection() {
 159  0
         return outputConnection;
 160  
     }
 161  
 
 162  
     /**
 163  
      * Sets the connection name (messenger instance) to use
 164  
      * to output messages
 165  
      */    
 166  
     public void setOutputConnection(String outputConnection) {
 167  0
         this.outputConnection = outputConnection;
 168  0
     }
 169  
     
 170  
     public String getOutputSubject() {
 171  0
         return outputSubject;
 172  
     }
 173  
     
 174  
     /** 
 175  
      * Sets the subject (i.e. destination name) to send messages to
 176  
      */
 177  
     public void setOutputSubject(String outputSubject) {
 178  0
         this.outputSubject = outputSubject;
 179  0
     }
 180  
     
 181  
     /**
 182  
      * Gets the Messenger used to output messages 
 183  
      */
 184  
     public Messenger getOutputMessenger() throws JMSException {
 185  0
         if ( outputMessenger == null ) {
 186  0
             String name = getOutputConnection();
 187  0
             if ( name != null ) {
 188  0
                 outputMessenger = getMessengerManager().getMessenger( name );
 189  
             }
 190  
             else {
 191  
                 // default to the input messenger
 192  0
                 outputMessenger = getMessenger();
 193  
             }
 194  
         }
 195  0
         return outputMessenger;
 196  
     }
 197  
     
 198  
     /**
 199  
      * Sets the Messenger used to output messages 
 200  
      */
 201  
     public void setOutputMessenger(Messenger outputMessenger) {
 202  0
         this.outputMessenger = outputMessenger;
 203  0
     }
 204  
     
 205  
     /**
 206  
      * Gets the Destination output messages will be sent to
 207  
      */
 208  
     public Destination getOutputDestination() throws JMSException {
 209  0
         if ( outputDestination == null ) {
 210  0
             String subject = getOutputSubject();
 211  0
             if ( subject == null ) {
 212  0
                 throw new JMSException( "A bridge must have an outputSubject defined!" );
 213  
             }
 214  0
             outputDestination = getOutputMessenger().getDestination( subject );
 215  
         }
 216  0
         return outputDestination;
 217  
     }
 218  
     
 219  
     /**
 220  
      * Sets the Destination output messages will be sent to
 221  
      */
 222  
     public void setOutputDestination(Destination outputDestination) {
 223  0
         this.outputDestination = outputDestination;
 224  0
     }
 225  
     
 226  
     /**
 227  
      * Gets the buffer size used for ByteMessage and StreamMessage copying
 228  
      */
 229  
     public int getBufferSize() {
 230  0
         return bufferSize;
 231  
     }
 232  
 
 233  
     /**
 234  
      * Sets the buffer size used for ByteMessage and StreamMessage copying
 235  
      */
 236  
     public void setBufferSize(int bufferSize) {
 237  0
         this.bufferSize = bufferSize;
 238  0
     }
 239  
     
 240  
     
 241  
     // Implementation methods
 242  
     //-------------------------------------------------------------------------
 243  
 
 244  
     /**
 245  
      * Strategy method to perform a commit() on both the incoming Messenger and the
 246  
      * output Messenger.
 247  
      */
 248  
     protected void commit() throws JMSException {
 249  0
         if ( transacted ) {
 250  0
             Messenger outputMessenger = getOutputMessenger();
 251  0
             Messenger inputMessenger = getMessenger();
 252  
             
 253  0
             if ( outputMessenger != inputMessenger ) {
 254  0
                 outputMessenger.commit();
 255  
             }
 256  0
             inputMessenger.commit();
 257  
         }
 258  0
     }
 259  
 
 260  
     /**
 261  
      * Strategy method to perform a rollback() on both the incoming Messenger and the
 262  
      * output Messenger.
 263  
      */
 264  
     protected void rollback() {
 265  0
         if ( transacted ) {
 266  
             try {
 267  0
                 Messenger outputMessenger = getOutputMessenger();
 268  0
                 Messenger inputMessenger = getMessenger();
 269  
                 
 270  0
                 if ( outputMessenger != inputMessenger ) {
 271  0
                         outputMessenger.rollback();
 272  
                 }
 273  
             }
 274  0
             catch (Exception e) {
 275  0
                 log.error( "Caught exception rolling back the output messenger: " + e, e );
 276  0
             }
 277  
             
 278  
             try {
 279  0
                 getMessenger().rollback();
 280  
             }
 281  0
             catch (Exception e) {
 282  0
                 log.error( "Caught exception rolling back the input messenger: " + e, e );
 283  0
             }
 284  
         }
 285  0
     }
 286  
 
 287  
     
 288  
     /**
 289  
      * Factory method to create an output message given an input message.
 290  
      * Derived classes could override this method to perform any kind of 
 291  
      * Message transformation.
 292  
      */
 293  
     protected Message createOutputMessage(Message inputMessage) throws JMSException {
 294  0
         Message outputMessage = null;
 295  
         
 296  0
         if ( inputMessage instanceof TextMessage ) {
 297  0
             outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
 298  
         }
 299  0
         else if ( inputMessage instanceof ObjectMessage ) {
 300  0
             outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
 301  
         }
 302  0
         else if ( inputMessage instanceof MapMessage ) {
 303  0
             outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
 304  
         }
 305  0
         else if ( inputMessage instanceof BytesMessage ) {
 306  0
             outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
 307  
         }
 308  0
         else if ( inputMessage instanceof StreamMessage ) {
 309  0
             outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
 310  
         }
 311  
         else {
 312  0
             outputMessage = getOutputMessenger().createMessage();
 313  
         }
 314  
         
 315  0
         processMessageHeaders(inputMessage, outputMessage);
 316  
         
 317  0
         return outputMessage;
 318  
     }
 319  
         
 320  
     /**
 321  
      * Factory method to create ObjectMessage 
 322  
      * Derived classes could override this method to perform any kind of 
 323  
      * Message transformation.
 324  
      */
 325  
     protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
 326  0
         return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
 327  
     }
 328  
     
 329  
     /**
 330  
      * Factory method to create TextMessage 
 331  
      * Derived classes could override this method to perform any kind of 
 332  
      * Message transformation.
 333  
      */
 334  
     protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
 335  0
         return getOutputMessenger().createTextMessage( inputMessage.getText() );
 336  
     }
 337  
     
 338  
     /**
 339  
      * Factory method to create MapMessage 
 340  
      * Derived classes could override this method to perform any kind of 
 341  
      * Message transformation.
 342  
      */
 343  
     protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
 344  0
         MapMessage answer = getOutputMessenger().createMapMessage();
 345  
         
 346  
         // copy across all values
 347  0
         for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) {
 348  0
             String name = (String) e.nextElement();
 349  0
             Object value = inputMessage.getObject( name );
 350  0
             answer.setObject( name, value );
 351  0
         }
 352  0
         return answer;
 353  
     }
 354  
     
 355  
     /**
 356  
      * Factory method to create BytesMessage 
 357  
      * Derived classes could override this method to perform any kind of 
 358  
      * Message transformation.
 359  
      */
 360  
     protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
 361  0
         BytesMessage answer = getOutputMessenger().createBytesMessage();
 362  
         
 363  
         // copy across all data
 364  0
         byte[] buffer = new byte[bufferSize];
 365  
         while (true ) {
 366  0
             int size = inputMessage.readBytes( buffer );
 367  0
             if ( size <= 0 ) {
 368  0
                 break;
 369  
             }
 370  0
             answer.writeBytes( buffer, 0, size );
 371  0
             if ( size < bufferSize ) {
 372  0
                 break;
 373  
             }
 374  0
         }
 375  0
         return answer;
 376  
     }
 377  
     
 378  
     /**
 379  
      * Factory method to create StreamMessage 
 380  
      * Derived classes could override this method to perform any kind of 
 381  
      * Message transformation.
 382  
      */
 383  
     protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
 384  0
         StreamMessage answer = getOutputMessenger().createStreamMessage();
 385  
         
 386  
         // copy across all data
 387  0
         byte[] buffer = new byte[bufferSize];
 388  
         while (true ) {
 389  0
             int size = inputMessage.readBytes( buffer );
 390  0
             if ( size <= 0 ) {
 391  0
                 break;
 392  
             }
 393  0
             answer.writeBytes( buffer, 0, size );
 394  0
             if ( size < bufferSize ) {
 395  0
                 break;
 396  
             }
 397  0
         }
 398  0
         return answer;
 399  
     }
 400  
     
 401  
     
 402  
     
 403  
     /**
 404  
      * Strategy method to add any headers required on the output message.
 405  
      * Derived classes could override this method to perform any kind of 
 406  
      * header processing, such as copying the correlation ID, copying all
 407  
      * headers or adding some new custom headers etc.
 408  
      */
 409  
     protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
 410  0
     }
 411  
      
 412  
     /**
 413  
      * Strategy method to allow different derived classes to acknowledge
 414  
      * messages differently, such as to disable acknowledgements
 415  
      */
 416  
     protected void acknowledge(Message message) throws JMSException {
 417  0
         message.acknowledge();
 418  0
     }            
 419  
     
 420  
     /**
 421  
      * Validates that there is a valid output destintation that we can use.
 422  
      * Derivations of this class could use multiple destinations
 423  
      */
 424  
     protected void validateOutputDestination() throws JMSException, ServletException {
 425  0
         if ( getOutputDestination() == null ) {
 426  0
             throw new ServletException( "No output Destination is defined for this Bridge" );
 427  
         }
 428  0
     }
 429  
 }
 430  
 
 431