Coverage Report - org.apache.commons.messenger.SimpleMessenger
 
Classes in this File Line Coverage Branch Coverage Complexity
SimpleMessenger
0%
0/78
0%
0/24
1.944
SimpleMessenger$ThreadLocalData
0%
0/4
N/A
1.944
 
 1  
 /*
 2  
  * Copyright 1999-2004 The Apache Software Foundation.
 3  
  * 
 4  
  * Licensed under the Apache License, Version 2.0 (the "License");
 5  
  * you may not use this file except in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  * 
 8  
  *      http://www.apache.org/licenses/LICENSE-2.0
 9  
  * 
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 package org.apache.commons.messenger;
 17  
 
 18  
 import java.util.LinkedList;
 19  
 
 20  
 import javax.jms.Connection;
 21  
 import javax.jms.ConnectionFactory;
 22  
 import javax.jms.Destination;
 23  
 import javax.jms.JMSException;
 24  
 import javax.jms.Message;
 25  
 import javax.jms.MessageConsumer;
 26  
 import javax.jms.MessageListener;
 27  
 import javax.jms.MessageProducer;
 28  
 import javax.jms.Queue;
 29  
 import javax.jms.QueueSender;
 30  
 import javax.jms.ServerSessionPool;
 31  
 import javax.jms.Session;
 32  
 import javax.jms.Topic;
 33  
 import javax.jms.TopicPublisher;
 34  
 
 35  
 import org.apache.commons.logging.Log;
 36  
 import org.apache.commons.logging.LogFactory;
 37  
 
 38  
 /** <p><code>SimpleMessenger</code> is an implementation of
 39  
   * Messenger which uses a single JMS Session for sending
 40  
   * to keep the JMS Session that should be used for a given calling thread.</p>
 41  
   *
 42  
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 43  
   * @version $Revision: 155459 $
 44  
   */
 45  
 public class SimpleMessenger extends MessengerSupport {
 46  
 
 47  
     /** Logger */
 48  0
     private static final Log log = LogFactory.getLog(SimpleMessenger.class);
 49  
 
 50  
     /** the SessionFactory used to create new JMS sessions */
 51  
     private SessionFactory sessionFactory;
 52  
 
 53  
     /** pool of MessengerSession instances */
 54  0
     private LinkedList pool = new LinkedList();
 55  
 
 56  
     /** thread local data for RPCs */    
 57  0
     private ThreadLocal threadLocalData = new ThreadLocal();
 58  
 
 59  
     private static int count;
 60  
 
 61  0
     public SimpleMessenger() {
 62  0
     }
 63  
 
 64  
     /** Returns the SessionFactory used to create new JMS sessions */
 65  
     public SessionFactory getSessionFactory() throws JMSException {
 66  0
         if (sessionFactory == null) {
 67  0
             sessionFactory = createSessionFactory();
 68  
         }
 69  0
         return sessionFactory;
 70  
     }
 71  
 
 72  
     /** Sets the SessionFactory used to create new JMS sessions */
 73  
     public void setSessionFactory(SessionFactory sessionFactory) {
 74  0
         this.sessionFactory = sessionFactory;
 75  0
     }
 76  
 
 77  
     public Connection getConnection() throws JMSException {
 78  0
         return getSessionFactory().getConnection();
 79  
     }
 80  
 
 81  
     public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
 82  
         throws JMSException {
 83  0
         return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
 84  
     }
 85  
 
 86  
     public synchronized void close() throws JMSException {
 87  0
         while (! pool.isEmpty()) {
 88  0
             MessengerSession session = (MessengerSession) pool.removeFirst();
 89  0
             session.close();
 90  0
         }
 91  
 
 92  0
         getSessionFactory().close();
 93  0
     }
 94  
 
 95  
     public Session getSession() throws JMSException {
 96  0
          throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
 97  
     }
 98  
 
 99  
     public Session getAsyncSession() throws JMSException {
 100  0
         throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
 101  
     }
 102  
 
 103  
     public Message call(Destination destination, Message message) throws JMSException {
 104  0
         ThreadLocalData data = null;
 105  0
         MessengerSession messengerSession = borrowMessengerSession();
 106  
         try {
 107  0
             data = getThreadLocalData(messengerSession.getSession());
 108  0
             Destination replyTo = data.destination;
 109  0
             message.setJMSReplyTo(replyTo);
 110  
         }
 111  
         finally {
 112  0
             returnMessengerSession(messengerSession);
 113  0
         }
 114  
  
 115  0
         log.info("Sending message to destination: " + destination);
 116  
 
 117  
     
 118  0
         send(destination, message);
 119  
  
 120  0
         messengerSession = borrowMessengerSession();
 121  
         try {    
 122  
             
 123  
 //            MessageProducer producer = messengerSession.getMessageProducer(destination);
 124  
 //            if (messengerSession.isTopic()) {
 125  
 //                ((TopicPublisher) producer).publish((Topic) destination, message);
 126  
 //            }
 127  
 //            else {
 128  
 //                ((QueueSender) producer).send((Queue) destination, message);
 129  
 //            }
 130  
 // 
 131  0
             log.info("Message sent - now waiting for a response...");
 132  
        
 133  0
             MessageConsumer consumer = data.consumer;
 134  0
             Message response = consumer.receive();
 135  
 //            Message response = null;
 136  0
             if (response == null) {
 137  
                 // we could have timed out so lets trash the temporary destination
 138  
                 // so that the next call() method will use a new destination to avoid
 139  
                 // the response for this call() coming back on later call() invokcations
 140  0
                 data.clear();
 141  
             }
 142  0
             return response;
 143  
         }
 144  
         finally {
 145  0
             returnMessengerSession(messengerSession);
 146  
         }
 147  
     }
 148  
 
 149  
     public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
 150  0
         MessengerSession messengerSession = borrowMessengerSession();
 151  
         try {
 152  0
             ThreadLocalData data = getThreadLocalData(messengerSession.getSession());
 153  0
             Destination replyTo = data.destination;
 154  0
             message.setJMSReplyTo(replyTo);
 155  
 
 156  0
             MessageProducer producer = messengerSession.getMessageProducer(destination);
 157  
             
 158  0
             MessageConsumer consumer = data.consumer;
 159  
 
 160  0
             if (messengerSession.isTopic()) {
 161  0
                 ((TopicPublisher) producer).publish((Topic) destination, message);
 162  
             }
 163  
             else {
 164  0
                 ((QueueSender) producer).send((Queue) destination, message);
 165  
             }
 166  0
             Message response = consumer.receive(timeoutMillis);
 167  0
             if (response == null) {
 168  
                 // we could have timed out so lets trash the temporary destination
 169  
                 // so that the next call() method will use a new destination to avoid
 170  
                 // the response for this call() coming back on later call() invokcations
 171  0
                 data.clear();
 172  
             }
 173  0
             return response;
 174  
         }
 175  
         finally {
 176  0
             returnMessengerSession(messengerSession);
 177  
         }
 178  
      }
 179  
 
 180  
 
 181  
     /**
 182  
      * @return the local thread data
 183  
      */
 184  
     protected ThreadLocalData getThreadLocalData(Session session) throws JMSException {
 185  0
         ThreadLocalData data = (ThreadLocalData) threadLocalData.get();
 186  0
         if (data == null) {
 187  0
             data = new ThreadLocalData();
 188  0
             threadLocalData.set(data);
 189  
         }
 190  0
         if (data.destination == null) {
 191  0
             data.destination = createTemporaryDestination();
 192  
         }
 193  0
         if (data.consumer == null) {
 194  0
             data.consumer = this.createConsumer(data.destination);
 195  
         }
 196  0
         return data;
 197  
     }
 198  
 
 199  
     // Implementation methods
 200  
     //-------------------------------------------------------------------------
 201  0
     protected static class ThreadLocalData {
 202  
         public MessageConsumer consumer;
 203  
         public Destination destination;
 204  
 
 205  
         public void clear() throws JMSException {
 206  0
             destination = null;
 207  0
             consumer.close();
 208  0
         }
 209  
     }
 210  
 
 211  
     protected boolean isTopic(Connection connection) throws JMSException {
 212  0
         return getSessionFactory().isTopic();
 213  
     }
 214  
 
 215  
     protected boolean isTopic(ConnectionFactory factory) throws JMSException {
 216  0
         return getSessionFactory().isTopic();
 217  
     }
 218  
 
 219  
 
 220  
 
 221  
     /**
 222  
      * Factory method to create a new MessengerSession
 223  
      */
 224  
     protected MessengerSession createMessengerSession() throws JMSException {
 225  0
         MessengerSession answer =  new MessengerSession(this, getSessionFactory());
 226  0
         if (log.isDebugEnabled()) {
 227  0
             log.debug("Created MessengerSession: " + ++count + " value: " + answer);
 228  
         }
 229  0
         return answer;
 230  
     }
 231  
 
 232  
     /** Factory method to create a SessionFactory.
 233  
       * Derived classes could override this method to create the SessionFactory
 234  
       * from a well known place
 235  
       */
 236  
     protected SessionFactory createSessionFactory() throws JMSException {
 237  0
         throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
 238  
     }
 239  
     
 240  
     protected synchronized MessengerSession borrowMessengerSession() throws JMSException {
 241  0
         MessengerSession answer = null;
 242  0
         if (pool.isEmpty()) {
 243  0
             answer = createMessengerSession();
 244  
         }
 245  
         else {
 246  0
             answer = (MessengerSession) pool.removeFirst();
 247  
         }
 248  0
         if (log.isDebugEnabled()) {
 249  0
             log.debug("#### Borrowing messenger session: " + answer);
 250  
         }
 251  0
         return answer;
 252  
     }
 253  
     
 254  
     protected synchronized void returnMessengerSession(MessengerSession session) {
 255  0
         if (log.isDebugEnabled()) {
 256  0
             log.debug("#### Returning messenger session: " + session);
 257  
         }
 258  0
         pool.addLast(session);
 259  0
     }
 260  
 
 261  
 }