Coverage Report - org.apache.commons.messagelet.XAConsumerThread
 
Classes in this File Line Coverage Branch Coverage Complexity
XAConsumerThread
0%
0/53
0%
0/12
1.818
 
 1  
 /*
 2  
  * Copyright (C) The Apache Software Foundation. All rights reserved.
 3  
  *
 4  
  * This software is published under the terms of the Apache Software License
 5  
  * version 1.1, a copy of which has been included with this distribution in
 6  
  * the LICENSE file.
 7  
  * 
 8  
  * $Id: XAConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
 9  
  */
 10  
 package org.apache.commons.messagelet;
 11  
 
 12  
 import javax.jms.MessageListener;
 13  
 import javax.transaction.SystemException;
 14  
 import javax.transaction.Transaction;
 15  
 import javax.transaction.TransactionManager;
 16  
 import javax.transaction.xa.XAResource;
 17  
 
 18  
 import org.apache.commons.logging.Log;
 19  
 import org.apache.commons.logging.LogFactory;
 20  
 import org.apache.commons.messenger.Messenger;
 21  
 import org.apache.commons.messenger.XACapable;
 22  
 import org.apache.commons.messenger.XACapableAdapter;
 23  
 
 24  
 /** 
 25  
  * <p><code>XAConsumerThread</code> is a thread which will perform XA processing
 26  
  * of messages
 27  
  *
 28  
  * @author damon.hamacha
 29  
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 30  
  * @version $Revision: 155459 $
 31  
  */
 32  
 public class XAConsumerThread extends ConsumerThread {
 33  
 
 34  
         /** Logger */
 35  0
         private static final Log log = LogFactory.getLog(XAConsumerThread.class);
 36  
 
 37  
         private TransactionManager transctionManager;
 38  
         private Transaction transaction;
 39  
 
 40  0
         public XAConsumerThread() {
 41  0
                 setName("XAConsumer" + getName());
 42  0
         }
 43  
 
 44  
         /**
 45  
          * @return the TransactionManager to be used
 46  
          * @throws SystemException
 47  
          */
 48  
         public TransactionManager getTransactionManager() throws SystemException {
 49  0
                 if (transctionManager == null) {
 50  0
                         transctionManager = createTransactionManager();
 51  
                 }
 52  0
                 return transctionManager;
 53  
         }
 54  
 
 55  
         /**
 56  
          * Sets the transaction manager to be used
 57  
          * 
 58  
          * @param transctionManager the transaction manager to be used
 59  
          */
 60  
         public void setTransactionManager(TransactionManager transctionManager) {
 61  0
                 this.transctionManager = transctionManager;
 62  0
         }
 63  
 
 64  
         // Implementation methods
 65  
     //-------------------------------------------------------------------------    
 66  
         
 67  
         /**
 68  
          * Factory method to create a TransactionManager via some mechanism.
 69  
          * By default this mechanism will lookup in JNDI 
 70  
          */
 71  
         protected TransactionManager createTransactionManager() throws SystemException {
 72  0
                 return null;
 73  
         }
 74  
         
 75  
                         
 76  
         /**
 77  
          * Enlists any resources with the current transaction.
 78  
          * Typically the input Messenger's Session will always be
 79  
          * enlisted. Then if the current MessageListener implements XACapable
 80  
          * then any of its resources will also be enlisted.
 81  
          * 
 82  
          * @param transaction the transaction to enlist resources with
 83  
          * @throws Exception if the enlistment fails for whatever reason
 84  
          */
 85  
         protected void enlist(Transaction transaction) throws Exception {
 86  0
                 XACapable xaCapable = getXACapable( getMessenger() );
 87  0
                 xaCapable.enlistResources(transaction);
 88  
         
 89  0
                 MessageListener listener = getListener();
 90  0
                 if (listener instanceof XACapable) {
 91  0
                         xaCapable = (XACapable) listener;
 92  0
                         xaCapable.enlistResources(transaction);
 93  
                 }
 94  
                 
 95  0
                 if (listener instanceof BridgeMDO) {
 96  0
                         BridgeMDO bridge = (BridgeMDO) listener;
 97  0
                         xaCapable = getXACapable( bridge.getOutputMessenger() );
 98  0
                         xaCapable.enlistResources(transaction);
 99  
                 }
 100  0
         }
 101  
                 
 102  
 
 103  
 
 104  
         /**
 105  
          * Delists any resources from the current transaction.
 106  
          * This includes the current input Messenger's Session as well
 107  
          * as any resources used by the MessageListener if it implements
 108  
          * XACapable
 109  
          * 
 110  
          * @param transaction
 111  
          * @param flag is the flag used by JTA when delisting resources.
 112  
          * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
 113  
          * @throws Exception
 114  
          */
 115  
         protected void delist(Transaction transaction, int flag) throws Exception {
 116  0
                 XACapable xaCapable = getXACapable( getMessenger() );
 117  0
                 xaCapable.delistResources(transaction, flag);
 118  
                 
 119  0
                 MessageListener listener = getListener();
 120  0
                 if (listener instanceof XACapable) {
 121  0
                         xaCapable = (XACapable) listener;
 122  0
                         xaCapable.delistResources(transaction, flag);
 123  
                 }
 124  
 
 125  0
                 if (listener instanceof BridgeMDO) {
 126  0
                         BridgeMDO bridge = (BridgeMDO) listener;
 127  0
                         xaCapable = getXACapable( bridge.getOutputMessenger() );
 128  0
                         xaCapable.delistResources(transaction, flag);
 129  
                 }
 130  0
         }
 131  
 
 132  
         /**
 133  
          * Strategy method to represent the code required to start
 134  
          * a transaction.
 135  
          */
 136  
         protected void startTransaction() throws Exception {
 137  0
                 getTransactionManager().begin();
 138  0
                 transaction = getTransactionManager().getTransaction();
 139  
 
 140  0
                 enlist(transaction);
 141  0
         }
 142  
 
 143  
         /**
 144  
          * Strategy method to represent the code required to commit
 145  
          * a transaction.
 146  
          */
 147  
         protected void commitTransaction() throws Exception {
 148  0
                 delist(transaction, XAResource.TMSUCCESS);
 149  
                 try {
 150  0
                         transaction.commit();
 151  
                 }
 152  0
                 catch (Exception e) {
 153  0
                         log.error("Caught exception while committing txn: " + e, e);
 154  0
                         transaction.setRollbackOnly();
 155  0
                         throw e;                
 156  0
                 }
 157  0
         }
 158  
 
 159  
         /**
 160  
          * Strategy method to represent the code required to rollback
 161  
          * a transaction.
 162  
          */
 163  
         protected void rollbackTransaction() throws Exception {
 164  0
                 delist(transaction, XAResource.TMFAIL);
 165  0
                 transaction.rollback();
 166  0
         }
 167  
 
 168  
         /**
 169  
          * Strategy method to represent the code required to cancel
 170  
          * a transaction. 
 171  
          * This is called when a message is not received.
 172  
          */
 173  
         protected void cancelTransaction() throws Exception {
 174  0
                 delist(transaction, XAResource.TMFAIL);
 175  0
                 transaction.rollback();
 176  0
         }
 177  
 
 178  
         /**
 179  
          * @return an XACapable for the given Messenger
 180  
          */
 181  
         protected XACapable getXACapable(Messenger messenger) {                
 182  0
                 if (messenger instanceof XACapable) {
 183  0
                         return (XACapable) messenger;
 184  
                 }
 185  0
                 return new XACapableAdapter(messenger);
 186  
         }
 187  
 }