Coverage Report - org.apache.commons.messenger.DefaultMessenger
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultMessenger
0%
0/95
0%
0/20
1.682
 
 1  
 /*
 2  
  * Copyright 1999-2004 The Apache Software Foundation.
 3  
  * 
 4  
  * Licensed under the Apache License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  * 
 8  
  *      http://www.apache.org/licenses/LICENSE-2.0
 9  
  * 
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.apache.commons.messenger;
 17  
 
 18  
 import javax.jms.Connection;
 19  
 import javax.jms.ConnectionFactory;
 20  
 import javax.jms.Destination;
 21  
 import javax.jms.JMSException;
 22  
 import javax.jms.Message;
 23  
 import javax.jms.MessageConsumer;
 24  
 import javax.jms.MessageListener;
 25  
 import javax.jms.MessageProducer;
 26  
 import javax.jms.Queue;
 27  
 import javax.jms.QueueSender;
 28  
 import javax.jms.QueueSession;
 29  
 import javax.jms.ServerSessionPool;
 30  
 import javax.jms.Session;
 31  
 import javax.jms.Topic;
 32  
 import javax.jms.TopicPublisher;
 33  
 import javax.jms.TopicSession;
 34  
 import javax.naming.Context;
 35  
 
 36  
 import org.apache.commons.logging.Log;
 37  
 import org.apache.commons.logging.LogFactory;
 38  
 
 39  
 /** <p><code>DefaultMessenger</code> is the default implementation of
 40  
   * Messenger which uses a {@link ThreadLocal} variable
 41  
   * to keep the JMS Session that should be used for a given calling thread.</p>
 42  
   *
 43  
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 44  
   * @version $Revision: 155459 $
 45  
   */
 46  
 public class DefaultMessenger extends MessengerSupport {
 47  
 
 48  
     private static final boolean SHARE_CONNECTION = true;
 49  
 
 50  
     /** Logger */
 51  0
     private static final Log log = LogFactory.getLog(DefaultMessenger.class);
 52  
 
 53  
     /** the MessengerSession for each thread */
 54  0
     private ThreadLocal messengerSessionPool = new ThreadLocal();
 55  
 
 56  
     /** the SessionFactory used to create new JMS sessions */
 57  
     private SessionFactory sessionFactory;
 58  
 
 59  0
     public DefaultMessenger() {
 60  0
     }
 61  
 
 62  
     /** Returns the SessionFactory used to create new JMS sessions */
 63  
     public SessionFactory getSessionFactory() throws JMSException {
 64  0
         if (sessionFactory == null) {
 65  0
             sessionFactory = createSessionFactory();
 66  
         }
 67  0
         return sessionFactory;
 68  
     }
 69  
 
 70  
     /** Sets the SessionFactory used to create new JMS sessions */
 71  
     public void setSessionFactory(SessionFactory sessionFactory) {
 72  0
         this.sessionFactory = sessionFactory;
 73  0
     }
 74  
 
 75  
     public Connection getConnection() throws JMSException {
 76  0
         return getSessionFactory().getConnection();
 77  
     }
 78  
 
 79  
     public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
 80  
         throws JMSException {
 81  0
         return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
 82  
     }
 83  
 
 84  
     public synchronized void close() throws JMSException {
 85  
         /** note that only the current session is terminated */
 86  0
         MessengerSession session = getMessengerSession();
 87  
 
 88  
         // clear all the pools...
 89  0
         messengerSessionPool = new ThreadLocal();
 90  
 
 91  0
         session.close();
 92  0
         getSessionFactory().close();
 93  0
     }
 94  
 
 95  
     public Session getSession() throws JMSException {
 96  0
         return getMessengerSession().getSession();
 97  
     }
 98  
 
 99  
     public Session getAsyncSession() throws JMSException {
 100  0
         return getMessengerSession().getListenerSession();
 101  
     }
 102  
 
 103  
     public Message call(Destination destination, Message message) throws JMSException {
 104  0
         MessengerSession session = borrowMessengerSession();
 105  
         try {
 106  0
             Destination replyTo = getReplyToDestination();
 107  0
             message.setJMSReplyTo(replyTo);
 108  
 
 109  
             // NOTE - we could consider adding a correlation ID per request so that we can ignore
 110  
             // any cruft or old messages that are sent onto our inbound queue.
 111  
             //
 112  
             // Though that does mean that we must then rely on the inbound message having
 113  
             // the right correlationID. Though at least this strategy would mean
 114  
             // that we could have a single consumer on a temporary queue for all threads
 115  
             // and use correlation IDs to dispatch to the corrent thread
 116  
             //
 117  
             // Maybe this should be a configurable strategy
 118  
 
 119  0
             MessageProducer producer = session.getMessageProducer(destination);
 120  0
             MessageConsumer consumer = getReplyToConsumer();
 121  
 
 122  0
             if (session.isTopic()) {
 123  0
                 ((TopicPublisher) producer).publish((Topic) destination, message);
 124  
             }
 125  
             else {
 126  0
                 ((QueueSender) producer).send((Queue) destination, message);
 127  
             }
 128  0
             Message response = consumer.receive();
 129  0
             if (response == null) {
 130  
                 // we could have timed out so lets trash the temporary destination
 131  
                 // so that the next call() method will use a new destination to avoid
 132  
                 // the response for this call() coming back on later call() invokcations
 133  0
                 clearReplyToDestination();
 134  
             }
 135  0
             return response;
 136  
         }
 137  
         finally {
 138  0
             returnMessengerSession(session);
 139  
         }
 140  
     }
 141  
 
 142  
     public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
 143  0
         MessengerSession session = borrowMessengerSession();
 144  
         try {
 145  0
             Destination replyTo = getReplyToDestination();
 146  0
             message.setJMSReplyTo(replyTo);
 147  
 
 148  
             // NOTE - we could consider adding a correlation ID per request so that we can ignore
 149  
             // any cruft or old messages that are sent onto our inbound queue.
 150  
             //
 151  
             // Though that does mean that we must then rely on the inbound message having
 152  
             // the right correlationID. Though at least this strategy would mean
 153  
             // that we could have a single consumer on a temporary queue for all threads
 154  
             // and use correlation IDs to dispatch to the corrent thread
 155  
             //
 156  
             // Maybe this should be a configurable strategy
 157  
 
 158  0
             MessageProducer producer = session.getMessageProducer(destination);
 159  
 
 160  0
             MessageConsumer consumer = getReplyToConsumer();
 161  0
             if (session.isTopic()) {
 162  0
                 ((TopicPublisher) producer).publish((Topic) destination, message);
 163  
             }
 164  
             else {
 165  0
                 ((QueueSender) producer).send((Queue) destination, message);
 166  
             }
 167  0
             Message response = consumer.receive(timeoutMillis);
 168  0
             if (response == null) {
 169  
                 // we could have timed out so lets trash the temporary destination
 170  
                 // so that the next call() method will use a new destination to avoid
 171  
                 // the response for this call() coming back on later call() invokcations
 172  0
                 clearReplyToDestination();
 173  
             }
 174  0
             return response;
 175  
         }
 176  
         finally {
 177  0
             returnMessengerSession(session);
 178  
         }
 179  
     }
 180  
 
 181  
     // Implementation methods
 182  
     //-------------------------------------------------------------------------
 183  
     protected boolean isTopic(Connection connection) throws JMSException {
 184  0
         return getSessionFactory().isTopic();
 185  
     }
 186  
 
 187  
     protected boolean isTopic(ConnectionFactory factory) throws JMSException {
 188  0
         return getSessionFactory().isTopic();
 189  
     }
 190  
 
 191  
     /**
 192  
      * @return the MessageConsumer for this threads temporary destination
 193  
      * which is cached for the duration of this process.
 194  
      */
 195  
     protected MessageConsumer getReplyToConsumer() throws JMSException {
 196  0
         MessengerSession messengerSession = getMessengerSession();
 197  0
         MessageConsumer consumer = messengerSession.getReplyToConsumer();
 198  0
         synchronized ( messengerSession ) {
 199  0
             if (consumer == null) {
 200  0
                 consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
 201  0
                 messengerSession.setReplyToConsumer(consumer);
 202  
             }
 203  0
         }
 204  0
         return consumer;
 205  
     }
 206  
 
 207  
     /**
 208  
      * Clears the temporary destination used to receive reply-to messages
 209  
      * which will lazily force a new destination and consumer to be created next
 210  
      * time a call() method is invoked.
 211  
      */
 212  
     protected void clearReplyToDestination() throws JMSException {
 213  0
         MessengerSession messengerSession = getMessengerSession();
 214  
 
 215  0
         messengerSession.setReplyToDestination(null);
 216  0
         MessageConsumer consumer = messengerSession.getReplyToConsumer();
 217  0
         if (consumer != null) {
 218  0
             messengerSession.setReplyToConsumer(null);
 219  
 
 220  
             // ensure that everything is nullified first before we close
 221  
             // just in case an exception occurs
 222  0
             consumer.close();
 223  
         }
 224  0
     }
 225  
 
 226  
     protected Destination getReplyToDestination() throws JMSException {
 227  0
         return getMessengerSession().getReplyToDestination();
 228  
     }
 229  
 
 230  
     protected MessengerSession getMessengerSession() throws JMSException {
 231  0
         return borrowMessengerSession();
 232  
     }
 233  
 
 234  
     protected MessengerSession borrowMessengerSession() throws JMSException {
 235  0
         MessengerSession answer = (MessengerSession) messengerSessionPool.get();
 236  0
         if (answer == null) {
 237  0
             answer = createMessengerSession();
 238  0
             messengerSessionPool.set(answer);
 239  
         }
 240  0
         return answer;
 241  
     }
 242  
 
 243  
     protected void returnMessengerSession(MessengerSession session) {
 244  0
     }
 245  
 
 246  
     /**
 247  
      * Factory method to create a new MessengerSession
 248  
      */
 249  
     protected MessengerSession createMessengerSession() throws JMSException {
 250  0
         return new MessengerSession(this, getSessionFactory());
 251  
     }
 252  
 
 253  
     /** Factory method to create a SessionFactory.
 254  
       * Derived classes could override this method to create the SessionFactory
 255  
       * from a well known place
 256  
       */
 257  
     protected SessionFactory createSessionFactory() throws JMSException {
 258  0
         throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
 259  
     }
 260  
 
 261  
     public Queue getQueue(QueueSession session, String subject) throws JMSException {
 262  
         // XXXX: might want to cache
 263  0
         Context ctx = null;
 264  0
         JNDISessionFactory factory = null;
 265  
         
 266  0
         Queue queue = null;
 267  0
         if (isJndiDestinations()) {
 268  
             try {
 269  0
                 factory = (JNDISessionFactory) getSessionFactory();
 270  0
                 ctx = factory.getContext();
 271  0
                 queue = (Queue) ctx.lookup(subject);
 272  
             }
 273  0
             catch (Exception e) {
 274  0
                 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
 275  0
             }
 276  
         }
 277  
         else {
 278  
             // XXXX: might want to cache
 279  0
             queue = session.createQueue(subject);
 280  
         }
 281  0
         return queue;
 282  
     }
 283  
 
 284  
     public Topic getTopic(TopicSession session, String subject) throws JMSException {
 285  
         // XXXX: might want to cache
 286  0
         Context ctx = null;
 287  0
         JNDISessionFactory factory = null;
 288  
         
 289  0
         Topic topic = null;
 290  0
         if (isJndiDestinations()) {
 291  
             try {
 292  0
                 factory = (JNDISessionFactory) getSessionFactory();
 293  0
                 ctx = factory.getContext();
 294  0
                 topic = (Topic) ctx.lookup(subject);
 295  
             }
 296  0
             catch (Exception e) {
 297  0
                 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
 298  0
             }
 299  
         }
 300  
         else {
 301  0
             topic = session.createTopic(subject);
 302  
         }
 303  0
         return topic;
 304  
     }
 305  
 
 306  
 }