Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ConsumerThread |
|
| 1.5;1.5 |
1 | /* | |
2 | * Copyright (C) The Apache Software Foundation. All rights reserved. | |
3 | * | |
4 | * This software is published under the terms of the Apache Software License | |
5 | * version 1.1, a copy of which has been included with this distribution in | |
6 | * the LICENSE file. | |
7 | * | |
8 | * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $ | |
9 | */ | |
10 | package org.apache.commons.messagelet; | |
11 | ||
12 | import javax.jms.Destination; | |
13 | import javax.jms.JMSException; | |
14 | import javax.jms.Message; | |
15 | import javax.jms.MessageConsumer; | |
16 | import javax.jms.MessageListener; | |
17 | ||
18 | import org.apache.commons.logging.Log; | |
19 | import org.apache.commons.logging.LogFactory; | |
20 | import org.apache.commons.messenger.Messenger; | |
21 | ||
22 | /** | |
23 | * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages | |
24 | * using a receive() method on Messenger and then process the message. | |
25 | * This class is a good base class when implementing some kind of transactional processing of | |
26 | * JMS messages | |
27 | * | |
28 | * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> | |
29 | * @version $Revision: 155459 $ | |
30 | */ | |
31 | public class ConsumerThread extends Thread { | |
32 | ||
33 | /** Logger */ | |
34 | 0 | private static final Log log = LogFactory.getLog(ConsumerThread.class); |
35 | ||
36 | ||
37 | private MessageConsumer consumer; | |
38 | private Messenger messenger; | |
39 | private Destination destination; | |
40 | private String selector; | |
41 | private MessageListener listener; | |
42 | private boolean shouldStop; | |
43 | ||
44 | 0 | public ConsumerThread() { |
45 | 0 | setName("Consumer" + getName()); |
46 | 0 | } |
47 | ||
48 | ||
49 | /** | |
50 | * Starts all the JMS connections and consumes JMS messages, | |
51 | * passing them onto the MessageListener and Message Driven Objects | |
52 | */ | |
53 | public void run() { | |
54 | 0 | if (log.isDebugEnabled()) { |
55 | 0 | log.debug( "Starting consumer thread: " + getName()); |
56 | } | |
57 | try { | |
58 | 0 | startConsumer(); |
59 | } | |
60 | 0 | catch (JMSException e) { |
61 | 0 | log.error("Failed to start consumer thread: " + e, e); |
62 | 0 | setShouldStop(true); |
63 | 0 | } |
64 | ||
65 | 0 | while (! isShouldStop()) { |
66 | try { | |
67 | 0 | startTransaction(); |
68 | } | |
69 | 0 | catch (Exception e) { |
70 | 0 | log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e); |
71 | 0 | break; |
72 | 0 | } |
73 | ||
74 | try { | |
75 | 0 | Message message = receive(); |
76 | ||
77 | 0 | if (log.isTraceEnabled()) { |
78 | 0 | log.trace( "Found: " + message ); |
79 | } | |
80 | ||
81 | 0 | if (message != null) { |
82 | 0 | processMessage(message); |
83 | 0 | commitTransaction(); |
84 | } | |
85 | else { | |
86 | 0 | cancelTransaction(); |
87 | } | |
88 | } | |
89 | 0 | catch (Exception e) { |
90 | 0 | rollbackTransaction(e); |
91 | 0 | } |
92 | } | |
93 | ||
94 | try { | |
95 | 0 | stopConsumer(); |
96 | } | |
97 | 0 | catch (JMSException e) { |
98 | 0 | log.error("Failed to stop consuming messages: " + e, e); |
99 | 0 | } |
100 | 0 | } |
101 | ||
102 | // Properties | |
103 | //------------------------------------------------------------------------- | |
104 | ||
105 | /** | |
106 | * Returns the destination. | |
107 | * @return Destination | |
108 | */ | |
109 | public Destination getDestination() { | |
110 | 0 | return destination; |
111 | } | |
112 | ||
113 | /** | |
114 | * Returns the listener. | |
115 | * @return MessageListener | |
116 | */ | |
117 | public MessageListener getListener() { | |
118 | 0 | return listener; |
119 | } | |
120 | ||
121 | /** | |
122 | * Returns the messenger. | |
123 | * @return Messenger | |
124 | */ | |
125 | public Messenger getMessenger() { | |
126 | 0 | return messenger; |
127 | } | |
128 | ||
129 | /** | |
130 | * Returns the selector. | |
131 | * @return String | |
132 | */ | |
133 | public String getSelector() { | |
134 | 0 | return selector; |
135 | } | |
136 | ||
137 | /** | |
138 | * Returns the shouldStop. | |
139 | * @return boolean | |
140 | */ | |
141 | public boolean isShouldStop() { | |
142 | 0 | return shouldStop; |
143 | } | |
144 | ||
145 | /** | |
146 | * Sets the destination. | |
147 | * @param destination The destination to set | |
148 | */ | |
149 | public void setDestination(Destination destination) { | |
150 | 0 | this.destination = destination; |
151 | 0 | } |
152 | ||
153 | /** | |
154 | * Sets the listener. | |
155 | * @param listener The listener to set | |
156 | */ | |
157 | public void setListener(MessageListener listener) { | |
158 | 0 | this.listener = listener; |
159 | 0 | } |
160 | ||
161 | /** | |
162 | * Sets the messenger. | |
163 | * @param messenger The messenger to set | |
164 | */ | |
165 | public void setMessenger(Messenger messenger) { | |
166 | 0 | this.messenger = messenger; |
167 | 0 | } |
168 | ||
169 | /** | |
170 | * Sets the selector. | |
171 | * @param selector The selector to set | |
172 | */ | |
173 | public void setSelector(String selector) { | |
174 | 0 | this.selector = selector; |
175 | 0 | } |
176 | ||
177 | /** | |
178 | * Sets the shouldStop. | |
179 | * @param shouldStop The shouldStop to set | |
180 | */ | |
181 | public void setShouldStop(boolean shouldStop) { | |
182 | 0 | this.shouldStop = shouldStop; |
183 | 0 | } |
184 | ||
185 | // Implementation methods | |
186 | //------------------------------------------------------------------------- | |
187 | ||
188 | /** | |
189 | * Starts consuming messages | |
190 | */ | |
191 | protected void startConsumer() throws JMSException { | |
192 | 0 | consumer = createConsumer(); |
193 | 0 | } |
194 | ||
195 | /** | |
196 | * Stops consuming messages | |
197 | */ | |
198 | protected void stopConsumer() throws JMSException { | |
199 | 0 | consumer.close(); |
200 | 0 | } |
201 | ||
202 | /** | |
203 | * Factory method to create a new MessageConsumer | |
204 | */ | |
205 | protected MessageConsumer createConsumer() throws JMSException { | |
206 | 0 | String selector = getSelector(); |
207 | 0 | if (selector != null) { |
208 | 0 | return getMessenger().createConsumer(getDestination(), selector); |
209 | } | |
210 | else { | |
211 | 0 | return getMessenger().createConsumer(getDestination()); |
212 | } | |
213 | } | |
214 | ||
215 | /** | |
216 | * Strategy method to consume a message using a receive() kind of method. | |
217 | * @return the message or null if a message could not be found after waiting for | |
218 | * some period of time. | |
219 | */ | |
220 | private Message receive() throws JMSException { | |
221 | 0 | return getConsumer().receive(); |
222 | } | |
223 | ||
224 | /** | |
225 | * Strategy method to process a given message. | |
226 | * By default this will just invoke the MessageListener | |
227 | */ | |
228 | protected void processMessage(Message message) throws JMSException { | |
229 | 0 | MessageListener listener = getListener(); |
230 | 0 | if (listener != null) { |
231 | 0 | listener.onMessage(message); |
232 | } | |
233 | 0 | } |
234 | ||
235 | ||
236 | /** | |
237 | * Strategy method to represent the code required to start | |
238 | * a transaction. | |
239 | */ | |
240 | protected void startTransaction() throws Exception { | |
241 | 0 | } |
242 | ||
243 | /** | |
244 | * Strategy method to represent the code required to commit | |
245 | * a transaction. | |
246 | */ | |
247 | protected void commitTransaction() throws Exception { | |
248 | 0 | } |
249 | ||
250 | /** | |
251 | * Strategy method to represent the code required to rollback | |
252 | * a transaction. | |
253 | */ | |
254 | protected void rollbackTransaction(Exception e) { | |
255 | 0 | } |
256 | ||
257 | /** | |
258 | * Strategy method to represent the code required to cancel | |
259 | * a transaction. | |
260 | * This is called when a message is not received. | |
261 | */ | |
262 | protected void cancelTransaction() throws Exception { | |
263 | 0 | } |
264 | ||
265 | ||
266 | /** | |
267 | * @erturn the consumer of messages | |
268 | */ | |
269 | protected MessageConsumer getConsumer() { | |
270 | 0 | return consumer; |
271 | } | |
272 | } |