Coverage Report - org.apache.commons.messenger.MessengerSupport
 
Classes in this File Line Coverage Branch Coverage Complexity
MessengerSupport
0%
0/319
0%
0/48
1.77
 
 1  
 /*
 2  
  * Copyright (C) The Apache Software Foundation. All rights reserved.
 3  
  *
 4  
  * This software is published under the terms of the Apache Software License
 5  
  * version 1.1, a copy of which has been included with this distribution in
 6  
  * the LICENSE file.
 7  
  *
 8  
  * $Id: MessengerSupport.java 155459 2005-02-26 13:24:44Z dirkv $
 9  
  */
 10  
 package org.apache.commons.messenger;
 11  
 
 12  
 import java.io.Serializable;
 13  
 import java.util.HashMap;
 14  
 import java.util.Map;
 15  
 
 16  
 import javax.jms.BytesMessage;
 17  
 import javax.jms.Connection;
 18  
 import javax.jms.ConnectionConsumer;
 19  
 import javax.jms.ConnectionFactory;
 20  
 import javax.jms.DeliveryMode;
 21  
 import javax.jms.Destination;
 22  
 import javax.jms.JMSException;
 23  
 import javax.jms.MapMessage;
 24  
 import javax.jms.Message;
 25  
 import javax.jms.MessageConsumer;
 26  
 import javax.jms.MessageListener;
 27  
 import javax.jms.MessageProducer;
 28  
 import javax.jms.ObjectMessage;
 29  
 import javax.jms.Queue;
 30  
 import javax.jms.QueueBrowser;
 31  
 import javax.jms.QueueConnection;
 32  
 import javax.jms.QueueSender;
 33  
 import javax.jms.QueueSession;
 34  
 import javax.jms.ServerSessionPool;
 35  
 import javax.jms.Session;
 36  
 import javax.jms.StreamMessage;
 37  
 import javax.jms.TextMessage;
 38  
 import javax.jms.Topic;
 39  
 import javax.jms.TopicConnection;
 40  
 import javax.jms.TopicPublisher;
 41  
 import javax.jms.TopicSession;
 42  
 
 43  
 import org.apache.commons.logging.Log;
 44  
 import org.apache.commons.logging.LogFactory;
 45  
 
 46  
 /** <p><code>MessengerSupport</code> is an abstract base class which implements
 47  
   * most of the functionality of Messenger. Derivations need to specify the
 48  
   * connection and session creation and the pooling strategy.</p>
 49  
   *
 50  
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 51  
   * @version $Revision: 155459 $
 52  
   */
 53  
 public abstract class MessengerSupport implements Messenger {
 54  
 
 55  
     /** Logger */
 56  0
     private static final Log log = LogFactory.getLog(MessengerSupport.class);
 57  0
     private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
 58  
 
 59  
     private static final boolean CACHE_REQUESTOR = true;
 60  
 
 61  
     /** The name of the Messenger */
 62  
     private String name;
 63  
 
 64  
     /** 
 65  
      * Whether Queue's and Topic's are looked up using JNDI (true)
 66  
      * or wether they should be created on the fly 
 67  
      */
 68  
     private boolean jndiDestinations;
 69  
 
 70  
     /** are topic subscribers durable? */
 71  
     private boolean durable;
 72  
 
 73  
     /** the delivery mode used by default when sending messages */
 74  0
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
 75  
 
 76  
     /** the durable name used for durable topic based subscriptions */
 77  
     private String durableName;
 78  
 
 79  
     /** 
 80  
      * whether local messages are ignored when topic based subscription is used
 81  
      * with a message selector 
 82  
      */
 83  
     private boolean noLocal;
 84  
 
 85  
     /** Should we cache the requestor object per thread? */
 86  
     private boolean cacheRequestors;
 87  
 
 88  
     /** A Map of ListenerKey objects to MessageConsumer objects */
 89  0
     private Map listeners = new HashMap();
 90  
 
 91  
     /** whether MessageProducer instances should be cached or not */
 92  0
     private boolean cacheProducers = true;
 93  
 
 94  0
     public MessengerSupport() {
 95  0
     }
 96  
 
 97  
     public String toString() {
 98  
         try {
 99  0
             MessengerSession session = borrowMessengerSession();
 100  0
             String answer = super.toString() + " session: " + session.toString();
 101  0
             returnMessengerSession(session);
 102  0
             return answer;
 103  
         }
 104  0
         catch (Exception e) {
 105  0
             return super.toString() + " session: " + e.toString();
 106  
         }
 107  
     }
 108  
 
 109  
     public Destination getDestination(String subject) throws JMSException {
 110  0
         MessengerSession messengerSession = borrowMessengerSession();
 111  
         try {
 112  0
             boolean debug = destinationLog.isInfoEnabled();
 113  0
             Session session = messengerSession.getSession();
 114  0
             if (messengerSession.isTopic()) {
 115  0
                 if (debug) {
 116  0
                     destinationLog.info("Using topic: " + subject);
 117  
                 }
 118  0
                 return getTopic((TopicSession) session, subject);
 119  
             }
 120  
             else {
 121  0
                 if (debug) {
 122  0
                     destinationLog.info("Using queue: " + subject);
 123  
                 }
 124  0
                 return getQueue((QueueSession) session, subject);
 125  
             }
 126  
         }
 127  
         finally {
 128  0
             returnMessengerSession(messengerSession);
 129  
         }
 130  
     }
 131  
 
 132  
     public Destination createTemporaryDestination() throws JMSException {
 133  0
         MessengerSession messengerSession = borrowMessengerSession();
 134  
         try {
 135  0
             Session session = messengerSession.getSession();
 136  0
             if (messengerSession.isTopic()) {
 137  0
                 TopicSession topicSession = (TopicSession) session;
 138  0
                 return topicSession.createTemporaryTopic();
 139  
             }
 140  
             else {
 141  0
                 QueueSession queueSession = (QueueSession) session;
 142  0
                 return queueSession.createTemporaryQueue();
 143  
             }
 144  
         }
 145  
         finally {
 146  0
             returnMessengerSession(messengerSession);
 147  
         }
 148  
     }
 149  
 
 150  
     public void send(Destination destination, Message message) throws JMSException {
 151  0
         MessengerSession session = borrowMessengerSession();
 152  
         try {
 153  0
             MessageProducer producer = session.getMessageProducer(destination);
 154  0
             if (session.isTopic()) {
 155  0
                 ((TopicPublisher) producer).publish((Topic) destination, message);
 156  
             }
 157  
             else {
 158  0
                 ((QueueSender) producer).send((Queue) destination, message);
 159  
             }
 160  
         }
 161  
         finally {
 162  0
             returnMessengerSession(session);
 163  0
         }
 164  0
     }
 165  
 
 166  
     public Message receive(Destination destination) throws JMSException {
 167  0
         MessengerSession session = borrowMessengerSession();
 168  0
         MessageConsumer consumer = null;
 169  
         try {
 170  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination);
 171  0
             return consumer.receive();
 172  
         }
 173  
         finally {
 174  0
             returnMessageConsumer(consumer);
 175  0
             returnMessengerSession(session);
 176  
         }
 177  
     }
 178  
 
 179  
     public Message receive(Destination destination, String selector) throws JMSException {
 180  0
         MessengerSession session = borrowMessengerSession();
 181  0
         MessageConsumer consumer = null;
 182  
         try {
 183  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
 184  0
             return consumer.receive();
 185  
         }
 186  
         finally {
 187  0
             returnMessageConsumer(consumer);
 188  0
             returnMessengerSession(session);
 189  
         }
 190  
     }
 191  
 
 192  
     public Message receive(Destination destination, long timeoutMillis) throws JMSException {
 193  0
         MessengerSession session = borrowMessengerSession();
 194  0
         MessageConsumer consumer = null;
 195  
         try {
 196  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination);
 197  0
             return consumer.receive(timeoutMillis);
 198  
         }
 199  
         finally {
 200  0
             returnMessageConsumer(consumer);
 201  0
             returnMessengerSession(session);
 202  
         }
 203  
     }
 204  
 
 205  
     public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
 206  0
         MessengerSession session = borrowMessengerSession();
 207  0
         MessageConsumer consumer = null;
 208  
         try {
 209  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
 210  0
             return consumer.receive(timeoutMillis);
 211  
         }
 212  
         finally {
 213  0
             returnMessageConsumer(consumer);
 214  0
             returnMessengerSession(session);
 215  
         }
 216  
     }
 217  
 
 218  
     public Message receiveNoWait(Destination destination) throws JMSException {
 219  0
         MessengerSession session = borrowMessengerSession();
 220  0
         MessageConsumer consumer = null;
 221  
         try {
 222  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination);
 223  0
             return consumer.receiveNoWait();
 224  
         }
 225  
         finally {
 226  0
             returnMessageConsumer(consumer);
 227  0
             returnMessengerSession(session);
 228  
         }
 229  
     }
 230  
 
 231  
     public Message receiveNoWait(Destination destination, String selector) throws JMSException {
 232  0
         MessengerSession session = borrowMessengerSession();
 233  0
         MessageConsumer consumer = null;
 234  
         try {
 235  0
             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
 236  0
             return consumer.receiveNoWait();
 237  
         }
 238  
         finally {
 239  0
             returnMessageConsumer(consumer);
 240  0
             returnMessengerSession(session);
 241  
         }
 242  
     }
 243  
 
 244  
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
 245  0
         MessengerSession session = borrowMessengerSession();
 246  
         try {
 247  0
             return createMessageConsumer(session, session.getSession(), destination);
 248  
         }
 249  
         finally {
 250  0
             returnMessengerSession(session);
 251  
         }
 252  
     }
 253  
 
 254  
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
 255  0
         MessengerSession session = borrowMessengerSession();
 256  
         try {
 257  0
             return createMessageConsumer(session, session.getSession(), destination, selector);
 258  
         }
 259  
         finally {
 260  0
             returnMessengerSession(session);
 261  
         }
 262  
     }
 263  
 
 264  
     public void run() {
 265  
         // don't return sessions which throw an exception
 266  
         try {
 267  0
             MessengerSession session = borrowMessengerSession();
 268  0
             session.getSession().run();
 269  0
             returnMessengerSession(session);
 270  
         }
 271  0
         catch (JMSException e) {
 272  
             // ### ignore
 273  0
         }
 274  0
     }
 275  
 
 276  
     public ConnectionConsumer createConnectionConsumer(
 277  
         Destination destination,
 278  
         ServerSessionPool sessionPool,
 279  
         int maxMessages)
 280  
         throws JMSException {
 281  0
         return createConnectionConsumer(destination, null, sessionPool, maxMessages);
 282  
     }
 283  
 
 284  
     public ConnectionConsumer createConnectionConsumer(
 285  
         Destination destination,
 286  
         String selector,
 287  
         ServerSessionPool sessionPool,
 288  
         int maxMessages)
 289  
         throws JMSException {
 290  0
         Connection connection = getConnection();
 291  0
         if (isTopic(connection)) {
 292  0
             TopicConnection topicConnection = (TopicConnection) connection;
 293  0
             if (isDurable()) {
 294  0
                 return topicConnection.createDurableConnectionConsumer(
 295  
                     (Topic) destination,
 296  
                     getDurableName(),
 297  
                     selector,
 298  
                     sessionPool,
 299  
                     maxMessages);
 300  
             }
 301  
             else {
 302  0
                 return topicConnection.createConnectionConsumer(
 303  
                     (Topic) destination,
 304  
                     selector,
 305  
                     sessionPool,
 306  
                     maxMessages);
 307  
             }
 308  
         }
 309  
         else {
 310  0
             QueueConnection queueConnection = (QueueConnection) connection;
 311  0
             return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages);
 312  
         }
 313  
     }
 314  
 
 315  
     public abstract Connection getConnection() throws JMSException;
 316  
 
 317  
     // Listener API
 318  
     //-------------------------------------------------------------------------
 319  
     public void addListener(Destination destination, MessageListener listener) throws JMSException {
 320  0
         if (listener instanceof MessengerListener) {
 321  0
             MessengerListener messengerListener = (MessengerListener) listener;
 322  0
             messengerListener.setMessenger(this);
 323  
         }
 324  0
         MessengerSession session = borrowMessengerSession();
 325  
         try {
 326  0
             MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination);
 327  0
             consumer.setMessageListener(listener);
 328  0
             ListenerKey key = new ListenerKey(destination, listener);
 329  0
             listeners.put(key, consumer);
 330  
         }
 331  
         finally {
 332  0
             returnMessengerSession(session);
 333  0
         }
 334  0
     }
 335  
 
 336  
     public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
 337  
 
 338  0
         if (listener instanceof MessengerListener) {
 339  0
             MessengerListener messengerListener = (MessengerListener) listener;
 340  0
             messengerListener.setMessenger(this);
 341  
         }
 342  0
         MessengerSession session = borrowMessengerSession();
 343  
         try {
 344  0
             MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector);
 345  0
             consumer.setMessageListener(listener);
 346  0
             ListenerKey key = new ListenerKey(destination, listener, selector);
 347  0
             listeners.put(key, consumer);
 348  
         }
 349  
         finally {
 350  0
             returnMessengerSession(session);
 351  0
         }
 352  0
     }
 353  
 
 354  
     public void removeListener(Destination destination, MessageListener listener) throws JMSException {
 355  0
         ListenerKey key = new ListenerKey(destination, listener);
 356  0
         MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
 357  0
         if (consumer == null) {
 358  0
             throw new JMSException("The given listener object has not been added for the given destination");
 359  
         }
 360  0
         consumer.close();
 361  0
     }
 362  
 
 363  
     public void removeListener(Destination destination, String selector, MessageListener listener)
 364  
         throws JMSException {
 365  
 
 366  0
         ListenerKey key = new ListenerKey(destination, listener, selector);
 367  0
         MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
 368  0
         if (consumer == null) {
 369  0
             throw new JMSException("The given listener object has not been added for the given destination and selector");
 370  
         }
 371  0
         consumer.close();
 372  0
     }
 373  
 
 374  
     // Message factory methods
 375  
     //-------------------------------------------------------------------------
 376  
     public BytesMessage createBytesMessage() throws JMSException {
 377  0
         MessengerSession session = borrowMessengerSession();
 378  
         try {
 379  0
             return session.getSession().createBytesMessage();
 380  
         }
 381  
         finally {
 382  0
             returnMessengerSession(session);
 383  
         }
 384  
     }
 385  
 
 386  
     public MapMessage createMapMessage() throws JMSException {
 387  0
         MessengerSession session = borrowMessengerSession();
 388  
         try {
 389  0
             return session.getSession().createMapMessage();
 390  
         }
 391  
         finally {
 392  0
             returnMessengerSession(session);
 393  
         }
 394  
     }
 395  
 
 396  
     public Message createMessage() throws JMSException {
 397  0
         MessengerSession session = borrowMessengerSession();
 398  
         try {
 399  0
             return session.getSession().createMessage();
 400  
         }
 401  
         finally {
 402  0
             returnMessengerSession(session);
 403  
         }
 404  
     }
 405  
 
 406  
     public ObjectMessage createObjectMessage() throws JMSException {
 407  0
         MessengerSession session = borrowMessengerSession();
 408  
         try {
 409  0
             return session.getSession().createObjectMessage();
 410  
         }
 411  
         finally {
 412  0
             returnMessengerSession(session);
 413  
         }
 414  
     }
 415  
 
 416  
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
 417  0
         MessengerSession session = borrowMessengerSession();
 418  
         try {
 419  0
             return session.getSession().createObjectMessage(object);
 420  
         }
 421  
         finally {
 422  0
             returnMessengerSession(session);
 423  
         }
 424  
     }
 425  
 
 426  
     public StreamMessage createStreamMessage() throws JMSException {
 427  0
         MessengerSession session = borrowMessengerSession();
 428  
         try {
 429  0
             return session.getSession().createStreamMessage();
 430  
         }
 431  
         finally {
 432  0
             returnMessengerSession(session);
 433  
         }
 434  
     }
 435  
 
 436  
     public TextMessage createTextMessage() throws JMSException {
 437  0
         MessengerSession session = borrowMessengerSession();
 438  
         try {
 439  0
             return session.getSession().createTextMessage();
 440  
         }
 441  
         finally {
 442  0
             returnMessengerSession(session);
 443  
         }
 444  
     }
 445  
 
 446  
     public TextMessage createTextMessage(String text) throws JMSException {
 447  0
         MessengerSession session = borrowMessengerSession();
 448  
         try {
 449  0
             return session.getSession().createTextMessage(text);
 450  
         }
 451  
         finally {
 452  0
             returnMessengerSession(session);
 453  
         }
 454  
     }
 455  
 
 456  
     public void commit() throws JMSException {
 457  0
         MessengerSession session = borrowMessengerSession();
 458  
         try {
 459  0
             session.getSession().commit();
 460  
         }
 461  
         finally {
 462  0
             returnMessengerSession(session);
 463  0
         }
 464  0
     }
 465  
 
 466  
     public void rollback() throws JMSException {
 467  0
         MessengerSession session = borrowMessengerSession();
 468  
         try {
 469  0
             session.getSession().rollback();
 470  
         }
 471  
         finally {
 472  0
             returnMessengerSession(session);
 473  0
         }
 474  0
     }
 475  
 
 476  
     public void close() throws JMSException {
 477  0
         getSessionFactory().close();
 478  0
     }
 479  
 
 480  
     /**
 481  
      * Creates a browser on the given Queue
 482  
      */
 483  
     public QueueBrowser createBrowser(Destination destination) throws JMSException {
 484  0
         MessengerSession session = borrowMessengerSession();
 485  0
         QueueBrowser browser = null;
 486  
         try {
 487  0
             return createBrowser(session, destination);
 488  
         }
 489  
         finally {
 490  0
             returnMessengerSession(session);
 491  
         }
 492  
     }
 493  
 
 494  
     /** Get the producer's default delivery mode. */
 495  
     public int getDeliveryMode(Destination destination) throws JMSException {
 496  0
         MessengerSession session = borrowMessengerSession();
 497  0
         int deliveryMode = 0;
 498  
         try {
 499  0
             MessageProducer producer = session.getMessageProducer(destination);
 500  0
             deliveryMode = producer.getDeliveryMode();
 501  
         }
 502  
         finally {
 503  0
             returnMessengerSession(session);
 504  0
         }
 505  0
         return deliveryMode;
 506  
     }
 507  
 
 508  
     /** Set the producer's default delivery mode. */
 509  
     public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException {
 510  0
         MessengerSession session = borrowMessengerSession();
 511  0
         MessageProducer producer = null;
 512  
         try {
 513  0
             producer = session.getMessageProducer(destination);
 514  0
             producer.setDeliveryMode(deliveryMode);
 515  
         }
 516  
         finally {
 517  0
             returnMessengerSession(session);
 518  0
         }
 519  0
     }
 520  
 
 521  
     /**  Get the producer's default priority. */
 522  
     public int getPriority(Destination destination) throws JMSException {
 523  0
         MessengerSession session = borrowMessengerSession();
 524  0
         MessageProducer producer = null;
 525  0
         int priority = 0;
 526  
         try {
 527  0
             producer = session.getMessageProducer(destination);
 528  0
             priority = producer.getPriority();
 529  
         }
 530  
         finally {
 531  
 
 532  0
             returnMessengerSession(session);
 533  0
         }
 534  0
         return priority;
 535  
     }
 536  
 
 537  
     /**   Set the producer's default priority. */
 538  
     public void setPriority(Destination destination, int priority) throws JMSException {
 539  0
         MessengerSession session = borrowMessengerSession();
 540  0
         MessageProducer producer = null;
 541  
         try {
 542  0
             producer = session.getMessageProducer(destination);
 543  0
             producer.setPriority(priority);
 544  
         }
 545  
         finally {
 546  0
             returnMessengerSession(session);
 547  0
         }
 548  0
     }
 549  
 
 550  
     /**  Get the producer's default delivery mode. */
 551  
     public long getTimeToLive(Destination destination) throws JMSException {
 552  0
         MessengerSession session = borrowMessengerSession();
 553  0
         long timeToLive = 0;
 554  
         try {
 555  0
             MessageProducer producer = session.getMessageProducer(destination);
 556  0
             timeToLive = producer.getTimeToLive();
 557  
         }
 558  
         finally {
 559  0
             returnMessengerSession(session);
 560  0
         }
 561  0
         return timeToLive;
 562  
     }
 563  
 
 564  
     /**  <p>Set the default length of time in milliseconds from its dispatch time that
 565  
      *   a produced message should be retained by the message system.</p>
 566  
      */
 567  
     public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
 568  0
         MessengerSession session = borrowMessengerSession();
 569  
         try {
 570  0
             MessageProducer producer = session.getMessageProducer(destination);
 571  0
             producer.setTimeToLive(timeToLive);
 572  
         }
 573  
         finally {
 574  0
             returnMessengerSession(session);
 575  0
         }
 576  0
     }
 577  
 
 578  
     /** Get an indication of whether message timestamps are disabled. */
 579  
     public boolean getDisableMessageTimestamp(Destination destination) throws JMSException {
 580  0
         MessengerSession session = borrowMessengerSession();
 581  0
         boolean value = false;
 582  
         try {
 583  0
             MessageProducer producer = session.getMessageProducer(destination);
 584  0
             value = producer.getDisableMessageTimestamp();
 585  
         }
 586  
         finally {
 587  0
             returnMessengerSession(session);
 588  0
         }
 589  0
         return value;
 590  
     }
 591  
     
 592  
     /** Set whether message timestamps are disabled. */
 593  
     public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
 594  0
         MessengerSession session = borrowMessengerSession();
 595  
         try {
 596  0
             MessageProducer producer = session.getMessageProducer(destination);
 597  0
             producer.setDisableMessageTimestamp(value);
 598  
         }
 599  
         finally {
 600  0
             returnMessengerSession(session);
 601  0
         }
 602  0
     }
 603  
 
 604  
     /** Extends the send capability to send by specifying additional options. */
 605  
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
 606  
         throws JMSException {
 607  0
         MessengerSession session = borrowMessengerSession();
 608  
         try {
 609  0
             MessageProducer producer = session.getMessageProducer(destination);
 610  0
             if (session.isTopic()) {
 611  0
                 ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive);
 612  
             }
 613  
             else {
 614  0
                 ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive);
 615  
             }
 616  
         }
 617  
         finally {
 618  0
             returnMessengerSession(session);
 619  0
         }
 620  0
     }
 621  
 
 622  
     /**  Get an indication of whether message IDs are disabled. */
 623  
     public boolean getDisableMessageID(Destination destination) throws JMSException {
 624  0
         MessengerSession session = borrowMessengerSession();
 625  0
         boolean value = false;
 626  
         try {
 627  0
             MessageProducer producer = session.getMessageProducer(destination);
 628  0
             value = producer.getDisableMessageID();
 629  
         }
 630  
         finally {
 631  0
             returnMessengerSession(session);
 632  0
         }
 633  0
         return value;
 634  
     }
 635  
 
 636  
     /** Set whether message IDs are disabled. */
 637  
     public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
 638  0
         MessengerSession session = borrowMessengerSession();
 639  
         try {
 640  0
             MessageProducer producer = session.getMessageProducer(destination);
 641  0
             producer.setDisableMessageID(value);
 642  
         }
 643  
         finally {
 644  0
             returnMessengerSession(session);
 645  0
         }
 646  0
     }
 647  
 
 648  
     // Properties
 649  
     //-------------------------------------------------------------------------
 650  
     /** Gets the name that this Messenger is called in a MessengerManager */
 651  
     public String getName() {
 652  0
         return name;
 653  
     }
 654  
 
 655  
     /** Sets the name that this Messenger is called in a MessengerManager */
 656  
     public void setName(String name) {
 657  0
         this.name = name;
 658  0
     }
 659  
 
 660  
     /** Setter for jndiDestinations */
 661  
     public void setJndiDestinations(boolean jndiDestinations) {
 662  0
         this.jndiDestinations = jndiDestinations;
 663  0
     }
 664  
 
 665  
     /** Getter for jndiDestinations */
 666  
     public boolean isJndiDestinations() {
 667  0
         return jndiDestinations;
 668  
     }
 669  
 
 670  
     /** Gets whether topic subscribers are durable or not */
 671  
     public boolean isDurable() {
 672  0
         return durable;
 673  
     }
 674  
 
 675  
     /** Sets whether topic subscribers are durable or not */
 676  
     public void setDurable(boolean durable) {
 677  0
         this.durable = durable;
 678  0
     }
 679  
 
 680  
     /** Gets whether we should cache the requestor object per thread? */
 681  
     public boolean isCacheRequestors() {
 682  0
         return cacheRequestors;
 683  
     }
 684  
 
 685  
     /** Sets whether we should cache the requestor object per thread? */
 686  
     public void setCacheRequestors(boolean cacheRequestors) {
 687  0
         this.cacheRequestors = cacheRequestors;
 688  0
     }
 689  
 
 690  
     /** @return the durable name used for durable topic based subscriptions */
 691  
     public String getDurableName() {
 692  0
         return durableName;
 693  
     }
 694  
 
 695  
     /** Sets the durable name used for durable topic based subscriptions */
 696  
     public void setDurableName(String durableName) {
 697  0
         this.durableName = durableName;
 698  0
     }
 699  
 
 700  
     /** 
 701  
      * Gets whether local messages are ignored when topic based subscription is used
 702  
      * with a message selector 
 703  
      */
 704  
     public boolean isNoLocal() {
 705  0
         return noLocal;
 706  
     }
 707  
 
 708  
     /** 
 709  
      * Sets whether local messages are ignored when topic based subscription is used
 710  
      * with a message selector 
 711  
      */
 712  
     public void setNoLocal(boolean noLocal) {
 713  0
         this.noLocal = noLocal;
 714  0
     }
 715  
 
 716  
     /** Gets whether MessageProducer instances should be cached or not, which defaults to true */
 717  
     public boolean isCacheProducers() {
 718  0
         return cacheProducers;
 719  
     }
 720  
 
 721  
     /** Sets whether MessageProducer instances should be cached or not, which defaults to true */
 722  
     public void setCacheProducers(boolean cacheProducers) {
 723  0
         this.cacheProducers = cacheProducers;
 724  0
     }
 725  
 
 726  
     /**
 727  
      * Returns the delivery mode used on messages sent via this Messenger
 728  
      * @return int
 729  
      */
 730  
     public int getDeliveryMode() {
 731  0
         return deliveryMode;
 732  
     }
 733  
 
 734  
     /**
 735  
      * Sets the delivery mode used on messages sent via this Messenger
 736  
      * @param deliveryMode The deliveryMode to set
 737  
      */
 738  
     public void setDeliveryMode(int deliveryMode) {
 739  0
         this.deliveryMode = deliveryMode;
 740  0
     }
 741  
 
 742  
     /**
 743  
      * Sets whether message delivery should be persistent or not
 744  
      * 
 745  
      * @param persistentDelivery
 746  
      */
 747  
     public void setPersistentDelivery(boolean persistentDelivery) {
 748  0
         if (persistentDelivery) {
 749  0
             setDeliveryMode(DeliveryMode.PERSISTENT);
 750  
         }
 751  
         else {
 752  0
             setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 753  
         }
 754  0
     }
 755  
 
 756  
     // Implementation methods
 757  
     //-------------------------------------------------------------------------
 758  
 
 759  
     protected abstract MessengerSession borrowMessengerSession() throws JMSException;
 760  
 
 761  
     protected abstract void returnMessengerSession(MessengerSession session);
 762  
 
 763  
     protected abstract boolean isTopic(Connection connection) throws JMSException;
 764  
 
 765  
     protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
 766  
 
 767  
     /** @return a newly created message producer for the given session and destination */
 768  
     protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException {
 769  
 
 770  0
         MessageProducer answer = null;
 771  0
         Session session = messengerSession.getSession();
 772  0
         if (messengerSession.isTopic()) {
 773  0
             TopicSession topicSession = (TopicSession) session;
 774  0
             answer = topicSession.createPublisher((Topic) destination);
 775  0
         }
 776  
         else {
 777  0
             QueueSession queueSession = (QueueSession) session;
 778  0
             answer = queueSession.createSender((Queue) destination);
 779  
         }
 780  
 
 781  
         // configure the MessageProducer
 782  0
         if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
 783  0
             answer.setDeliveryMode(deliveryMode);
 784  
         }
 785  0
         return answer;
 786  
     }
 787  
 
 788  
     /** @return a MessageConsumer for the given session and destination */
 789  
     protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
 790  
         throws JMSException {
 791  
 
 792  0
         MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination);
 793  
 
 794  0
         if (log.isDebugEnabled()) {
 795  0
             log.debug("Created new consumer: " + consumer + " on destination: " + destination);
 796  
         }
 797  
 
 798  0
         return consumer;
 799  
     }
 800  
 
 801  
     /** @return a MessageConsumer for the given session, destination and selector */
 802  
     protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector)
 803  
         throws JMSException {
 804  
 
 805  0
         MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector);
 806  
 
 807  0
         if (log.isDebugEnabled()) {
 808  0
             log.debug(
 809  
                 "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector);
 810  
         }
 811  
 
 812  0
         return consumer;
 813  
     }
 814  
 
 815  
     /** 
 816  
      * Returns a message consumer back to the pool. 
 817  
      * By default this method will close message consumers though we should
 818  
      * be able to cache then
 819  
      */
 820  
     protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
 821  0
         if (log.isDebugEnabled()) {
 822  0
             log.debug("Closing consumer: " + messageConsumer);
 823  
         }
 824  
 
 825  0
         if (messageConsumer != null) {
 826  0
             messageConsumer.close();
 827  
         }
 828  0
     }
 829  
 
 830  
     /** @return a new MessageConsumer for the given session and destination */
 831  
     protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
 832  
         throws JMSException {
 833  0
         if (messengerSession.isTopic()) {
 834  0
             TopicSession topicSession = (TopicSession) session;
 835  0
             if (isDurable()) {
 836  0
                 return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal());
 837  
             }
 838  
             else {
 839  0
                 return topicSession.createSubscriber((Topic) destination, null, isNoLocal());
 840  
             }
 841  
         }
 842  
         else {
 843  0
             QueueSession queueSession = (QueueSession) session;
 844  0
             return queueSession.createReceiver((Queue) destination);
 845  
         }
 846  
     }
 847  
 
 848  
     /** @return a new MessageConsumer for the given session, destination and selector */
 849  
     protected MessageConsumer createMessageConsumer(
 850  
         MessengerSession messengerSession,
 851  
         Session session,
 852  
         Destination destination,
 853  
         String selector)
 854  
         throws JMSException {
 855  0
         if (messengerSession.isTopic()) {
 856  0
             TopicSession topicSession = (TopicSession) session;
 857  0
             if (isDurable()) {
 858  0
                 return topicSession.createDurableSubscriber(
 859  
                     (Topic) destination,
 860  
                     getDurableName(),
 861  
                     selector,
 862  
                     isNoLocal());
 863  
             }
 864  
             else {
 865  0
                 return topicSession.createSubscriber((Topic) destination, selector, isNoLocal());
 866  
             }
 867  
         }
 868  
         else {
 869  0
             QueueSession queueSession = (QueueSession) session;
 870  0
             return queueSession.createReceiver((Queue) destination, selector);
 871  
         }
 872  
     }
 873  
 
 874  
     /** @return a new QueueBrowser for the given session and destination */
 875  
     protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException {
 876  0
         if (session.isTopic()) {
 877  0
             return null;
 878  
         }
 879  
         else {
 880  0
             QueueSession queueSession = (QueueSession) session.getSession();
 881  0
             return queueSession.createBrowser((Queue) destination);
 882  
         }
 883  
     }
 884  
 
 885  
     protected Queue getQueue(QueueSession session, String subject) throws JMSException {
 886  
         // XXXX: might want to cache
 887  0
         return session.createQueue(subject);
 888  
     }
 889  
 
 890  
     protected Topic getTopic(TopicSession session, String subject) throws JMSException {
 891  
         // XXXX: might want to cache
 892  0
         return session.createTopic(subject);
 893  
     }
 894  
 }