Coverage Report - org.apache.commons.messagelet.ConsumerThread
 
Classes in this File Line Coverage Branch Coverage Complexity
ConsumerThread
0%
0/65
0%
0/12
1.5
 
 1  
 /*
 2  
  * Copyright (C) The Apache Software Foundation. All rights reserved.
 3  
  *
 4  
  * This software is published under the terms of the Apache Software License
 5  
  * version 1.1, a copy of which has been included with this distribution in
 6  
  * the LICENSE file.
 7  
  * 
 8  
  * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
 9  
  */
 10  
 package org.apache.commons.messagelet;
 11  
 
 12  
 import javax.jms.Destination;
 13  
 import javax.jms.JMSException;
 14  
 import javax.jms.Message;
 15  
 import javax.jms.MessageConsumer;
 16  
 import javax.jms.MessageListener;
 17  
 
 18  
 import org.apache.commons.logging.Log;
 19  
 import org.apache.commons.logging.LogFactory;
 20  
 import org.apache.commons.messenger.Messenger;
 21  
 
 22  
 /** 
 23  
  * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages
 24  
  * using a receive() method on Messenger and then process the message.
 25  
  * This class is a good base class when implementing some kind of transactional processing of 
 26  
  * JMS messages
 27  
  *
 28  
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 29  
  * @version $Revision: 155459 $
 30  
  */
 31  
 public class ConsumerThread extends Thread {
 32  
 
 33  
     /** Logger */
 34  0
     private static final Log log = LogFactory.getLog(ConsumerThread.class);
 35  
 
 36  
 
 37  
     private MessageConsumer consumer;
 38  
     private Messenger messenger;
 39  
     private Destination destination;
 40  
     private String selector;
 41  
     private MessageListener listener;
 42  
     private boolean shouldStop;
 43  
     
 44  0
     public ConsumerThread() {
 45  0
         setName("Consumer" + getName());
 46  0
     }
 47  
 
 48  
 
 49  
     /**
 50  
      * Starts all the JMS connections and consumes JMS messages, 
 51  
      * passing them onto the MessageListener and Message Driven Objects
 52  
      */
 53  
     public void run() {
 54  0
         if (log.isDebugEnabled()) {
 55  0
             log.debug( "Starting consumer thread: " + getName());
 56  
         }
 57  
         try {
 58  0
             startConsumer();
 59  
         }
 60  0
         catch (JMSException e) {
 61  0
             log.error("Failed to start consumer thread: " + e, e);
 62  0
             setShouldStop(true);
 63  0
         }
 64  
         
 65  0
         while (! isShouldStop()) {
 66  
                 try {
 67  0
                     startTransaction();
 68  
                 }
 69  0
                 catch (Exception e) {
 70  0
                         log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
 71  0
                         break;
 72  0
                 }
 73  
 
 74  
             try {
 75  0
                 Message message = receive();
 76  
 
 77  0
                 if (log.isTraceEnabled()) {
 78  0
                     log.trace( "Found: " + message );
 79  
                 }
 80  
                 
 81  0
                 if (message != null) {
 82  0
                     processMessage(message);
 83  0
                     commitTransaction();
 84  
                 }
 85  
                 else {
 86  0
                     cancelTransaction();
 87  
                 }
 88  
             }
 89  0
             catch (Exception e) {
 90  0
                 rollbackTransaction(e);
 91  0
             }
 92  
         }
 93  
         
 94  
         try {
 95  0
             stopConsumer();
 96  
         }
 97  0
         catch (JMSException e) {
 98  0
             log.error("Failed to stop consuming messages: " + e, e);
 99  0
         }
 100  0
     }
 101  
     
 102  
     // Properties
 103  
     //-------------------------------------------------------------------------    
 104  
 
 105  
     /**
 106  
      * Returns the destination.
 107  
      * @return Destination
 108  
      */
 109  
     public Destination getDestination() {
 110  0
         return destination;
 111  
     }
 112  
 
 113  
     /**
 114  
      * Returns the listener.
 115  
      * @return MessageListener
 116  
      */
 117  
     public MessageListener getListener() {
 118  0
         return listener;
 119  
     }
 120  
 
 121  
     /**
 122  
      * Returns the messenger.
 123  
      * @return Messenger
 124  
      */
 125  
     public Messenger getMessenger() {
 126  0
         return messenger;
 127  
     }
 128  
 
 129  
     /**
 130  
      * Returns the selector.
 131  
      * @return String
 132  
      */
 133  
     public String getSelector() {
 134  0
         return selector;
 135  
     }
 136  
 
 137  
     /**
 138  
      * Returns the shouldStop.
 139  
      * @return boolean
 140  
      */
 141  
     public boolean isShouldStop() {
 142  0
         return shouldStop;
 143  
     }
 144  
 
 145  
     /**
 146  
      * Sets the destination.
 147  
      * @param destination The destination to set
 148  
      */
 149  
     public void setDestination(Destination destination) {
 150  0
         this.destination = destination;
 151  0
     }
 152  
 
 153  
     /**
 154  
      * Sets the listener.
 155  
      * @param listener The listener to set
 156  
      */
 157  
     public void setListener(MessageListener listener) {
 158  0
         this.listener = listener;
 159  0
     }
 160  
 
 161  
     /**
 162  
      * Sets the messenger.
 163  
      * @param messenger The messenger to set
 164  
      */
 165  
     public void setMessenger(Messenger messenger) {
 166  0
         this.messenger = messenger;
 167  0
     }
 168  
 
 169  
     /**
 170  
      * Sets the selector.
 171  
      * @param selector The selector to set
 172  
      */
 173  
     public void setSelector(String selector) {
 174  0
         this.selector = selector;
 175  0
     }
 176  
 
 177  
     /**
 178  
      * Sets the shouldStop.
 179  
      * @param shouldStop The shouldStop to set
 180  
      */
 181  
     public void setShouldStop(boolean shouldStop) {
 182  0
         this.shouldStop = shouldStop;
 183  0
     }
 184  
 
 185  
     // Implementation methods
 186  
     //-------------------------------------------------------------------------    
 187  
     
 188  
     /**
 189  
      * Starts consuming messages        
 190  
      */    
 191  
     protected void startConsumer() throws JMSException {
 192  0
         consumer = createConsumer();
 193  0
     }
 194  
 
 195  
     /**
 196  
      * Stops consuming messages        
 197  
      */    
 198  
     protected void stopConsumer() throws JMSException {
 199  0
         consumer.close();
 200  0
     }
 201  
 
 202  
     /**
 203  
      * Factory method to create a new MessageConsumer 
 204  
      */
 205  
     protected MessageConsumer createConsumer() throws JMSException {
 206  0
         String selector = getSelector();
 207  0
         if (selector != null) {
 208  0
             return getMessenger().createConsumer(getDestination(), selector);
 209  
         }
 210  
         else {
 211  0
             return getMessenger().createConsumer(getDestination());
 212  
         }
 213  
     }
 214  
 
 215  
     /**
 216  
      * Strategy method to consume a message using a receive() kind of method.
 217  
      * @return the message or null if a message could not be found after waiting for
 218  
      * some period of time.
 219  
      */
 220  
     private Message receive() throws JMSException {
 221  0
         return getConsumer().receive();
 222  
     }    
 223  
     
 224  
     /**
 225  
      * Strategy method to process a given message. 
 226  
      * By default this will just invoke the MessageListener
 227  
      */
 228  
     protected void processMessage(Message message) throws JMSException {
 229  0
         MessageListener listener = getListener();
 230  0
         if (listener != null) {
 231  0
             listener.onMessage(message);
 232  
         }
 233  0
     }
 234  
 
 235  
 
 236  
     /**
 237  
      * Strategy method to represent the code required to start
 238  
      * a transaction.
 239  
      */
 240  
     protected void startTransaction() throws Exception {
 241  0
     }
 242  
 
 243  
     /**
 244  
      * Strategy method to represent the code required to commit
 245  
      * a transaction.
 246  
      */
 247  
     protected void commitTransaction() throws Exception {
 248  0
     }
 249  
 
 250  
     /**
 251  
      * Strategy method to represent the code required to rollback
 252  
      * a transaction.
 253  
      */
 254  
     protected void rollbackTransaction(Exception e) {
 255  0
     }
 256  
 
 257  
     /**
 258  
      * Strategy method to represent the code required to cancel
 259  
      * a transaction. 
 260  
      * This is called when a message is not received.
 261  
      */
 262  
     protected void cancelTransaction() throws Exception {
 263  0
     }
 264  
 
 265  
 
 266  
     /**
 267  
      * @erturn the consumer of messages 
 268  
      */
 269  
     protected MessageConsumer getConsumer() {
 270  0
         return consumer;
 271  
     }
 272  
 }