Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
XAConsumerThread |
|
| 1.8181818181818181;1.818 |
1 | /* | |
2 | * Copyright (C) The Apache Software Foundation. All rights reserved. | |
3 | * | |
4 | * This software is published under the terms of the Apache Software License | |
5 | * version 1.1, a copy of which has been included with this distribution in | |
6 | * the LICENSE file. | |
7 | * | |
8 | * $Id: XAConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $ | |
9 | */ | |
10 | package org.apache.commons.messagelet; | |
11 | ||
12 | import javax.jms.MessageListener; | |
13 | import javax.transaction.SystemException; | |
14 | import javax.transaction.Transaction; | |
15 | import javax.transaction.TransactionManager; | |
16 | import javax.transaction.xa.XAResource; | |
17 | ||
18 | import org.apache.commons.logging.Log; | |
19 | import org.apache.commons.logging.LogFactory; | |
20 | import org.apache.commons.messenger.Messenger; | |
21 | import org.apache.commons.messenger.XACapable; | |
22 | import org.apache.commons.messenger.XACapableAdapter; | |
23 | ||
24 | /** | |
25 | * <p><code>XAConsumerThread</code> is a thread which will perform XA processing | |
26 | * of messages | |
27 | * | |
28 | * @author damon.hamacha | |
29 | * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> | |
30 | * @version $Revision: 155459 $ | |
31 | */ | |
32 | public class XAConsumerThread extends ConsumerThread { | |
33 | ||
34 | /** Logger */ | |
35 | 0 | private static final Log log = LogFactory.getLog(XAConsumerThread.class); |
36 | ||
37 | private TransactionManager transctionManager; | |
38 | private Transaction transaction; | |
39 | ||
40 | 0 | public XAConsumerThread() { |
41 | 0 | setName("XAConsumer" + getName()); |
42 | 0 | } |
43 | ||
44 | /** | |
45 | * @return the TransactionManager to be used | |
46 | * @throws SystemException | |
47 | */ | |
48 | public TransactionManager getTransactionManager() throws SystemException { | |
49 | 0 | if (transctionManager == null) { |
50 | 0 | transctionManager = createTransactionManager(); |
51 | } | |
52 | 0 | return transctionManager; |
53 | } | |
54 | ||
55 | /** | |
56 | * Sets the transaction manager to be used | |
57 | * | |
58 | * @param transctionManager the transaction manager to be used | |
59 | */ | |
60 | public void setTransactionManager(TransactionManager transctionManager) { | |
61 | 0 | this.transctionManager = transctionManager; |
62 | 0 | } |
63 | ||
64 | // Implementation methods | |
65 | //------------------------------------------------------------------------- | |
66 | ||
67 | /** | |
68 | * Factory method to create a TransactionManager via some mechanism. | |
69 | * By default this mechanism will lookup in JNDI | |
70 | */ | |
71 | protected TransactionManager createTransactionManager() throws SystemException { | |
72 | 0 | return null; |
73 | } | |
74 | ||
75 | ||
76 | /** | |
77 | * Enlists any resources with the current transaction. | |
78 | * Typically the input Messenger's Session will always be | |
79 | * enlisted. Then if the current MessageListener implements XACapable | |
80 | * then any of its resources will also be enlisted. | |
81 | * | |
82 | * @param transaction the transaction to enlist resources with | |
83 | * @throws Exception if the enlistment fails for whatever reason | |
84 | */ | |
85 | protected void enlist(Transaction transaction) throws Exception { | |
86 | 0 | XACapable xaCapable = getXACapable( getMessenger() ); |
87 | 0 | xaCapable.enlistResources(transaction); |
88 | ||
89 | 0 | MessageListener listener = getListener(); |
90 | 0 | if (listener instanceof XACapable) { |
91 | 0 | xaCapable = (XACapable) listener; |
92 | 0 | xaCapable.enlistResources(transaction); |
93 | } | |
94 | ||
95 | 0 | if (listener instanceof BridgeMDO) { |
96 | 0 | BridgeMDO bridge = (BridgeMDO) listener; |
97 | 0 | xaCapable = getXACapable( bridge.getOutputMessenger() ); |
98 | 0 | xaCapable.enlistResources(transaction); |
99 | } | |
100 | 0 | } |
101 | ||
102 | ||
103 | ||
104 | /** | |
105 | * Delists any resources from the current transaction. | |
106 | * This includes the current input Messenger's Session as well | |
107 | * as any resources used by the MessageListener if it implements | |
108 | * XACapable | |
109 | * | |
110 | * @param transaction | |
111 | * @param flag is the flag used by JTA when delisting resources. | |
112 | * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL | |
113 | * @throws Exception | |
114 | */ | |
115 | protected void delist(Transaction transaction, int flag) throws Exception { | |
116 | 0 | XACapable xaCapable = getXACapable( getMessenger() ); |
117 | 0 | xaCapable.delistResources(transaction, flag); |
118 | ||
119 | 0 | MessageListener listener = getListener(); |
120 | 0 | if (listener instanceof XACapable) { |
121 | 0 | xaCapable = (XACapable) listener; |
122 | 0 | xaCapable.delistResources(transaction, flag); |
123 | } | |
124 | ||
125 | 0 | if (listener instanceof BridgeMDO) { |
126 | 0 | BridgeMDO bridge = (BridgeMDO) listener; |
127 | 0 | xaCapable = getXACapable( bridge.getOutputMessenger() ); |
128 | 0 | xaCapable.delistResources(transaction, flag); |
129 | } | |
130 | 0 | } |
131 | ||
132 | /** | |
133 | * Strategy method to represent the code required to start | |
134 | * a transaction. | |
135 | */ | |
136 | protected void startTransaction() throws Exception { | |
137 | 0 | getTransactionManager().begin(); |
138 | 0 | transaction = getTransactionManager().getTransaction(); |
139 | ||
140 | 0 | enlist(transaction); |
141 | 0 | } |
142 | ||
143 | /** | |
144 | * Strategy method to represent the code required to commit | |
145 | * a transaction. | |
146 | */ | |
147 | protected void commitTransaction() throws Exception { | |
148 | 0 | delist(transaction, XAResource.TMSUCCESS); |
149 | try { | |
150 | 0 | transaction.commit(); |
151 | } | |
152 | 0 | catch (Exception e) { |
153 | 0 | log.error("Caught exception while committing txn: " + e, e); |
154 | 0 | transaction.setRollbackOnly(); |
155 | 0 | throw e; |
156 | 0 | } |
157 | 0 | } |
158 | ||
159 | /** | |
160 | * Strategy method to represent the code required to rollback | |
161 | * a transaction. | |
162 | */ | |
163 | protected void rollbackTransaction() throws Exception { | |
164 | 0 | delist(transaction, XAResource.TMFAIL); |
165 | 0 | transaction.rollback(); |
166 | 0 | } |
167 | ||
168 | /** | |
169 | * Strategy method to represent the code required to cancel | |
170 | * a transaction. | |
171 | * This is called when a message is not received. | |
172 | */ | |
173 | protected void cancelTransaction() throws Exception { | |
174 | 0 | delist(transaction, XAResource.TMFAIL); |
175 | 0 | transaction.rollback(); |
176 | 0 | } |
177 | ||
178 | /** | |
179 | * @return an XACapable for the given Messenger | |
180 | */ | |
181 | protected XACapable getXACapable(Messenger messenger) { | |
182 | 0 | if (messenger instanceof XACapable) { |
183 | 0 | return (XACapable) messenger; |
184 | } | |
185 | 0 | return new XACapableAdapter(messenger); |
186 | } | |
187 | } |