View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom;
19  
20  import java.io.Serializable;
21  import java.util.Properties;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.TimeUnit;
24  
25  import javax.jms.Connection;
26  import javax.jms.ConnectionFactory;
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import javax.jms.MapMessage;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageProducer;
33  import javax.jms.Session;
34  import javax.naming.NamingException;
35  
36  import org.apache.logging.log4j.core.LogEvent;
37  import org.apache.logging.log4j.core.appender.AbstractManager;
38  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
39  import org.apache.logging.log4j.core.appender.ManagerFactory;
40  import org.apache.logging.log4j.core.net.JndiManager;
41  import org.apache.logging.log4j.core.util.Log4jThread;
42  import org.apache.logging.log4j.status.StatusLogger;
43  import org.apache.logging.log4j.util.BiConsumer;
44  
45  /**
46   * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
47   *
48   * <p>
49   * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
50   * involving a configured ConnectionFactory and Destination.
51   * </p>
52   */
53  public class JmsManager extends AbstractManager {
54  
55      public static class JmsManagerConfiguration {
56          private final Properties jndiProperties;
57          private final String connectionFactoryName;
58          private final String destinationName;
59          private final String userName;
60          private final char[] password;
61          private final boolean immediateFail;
62          private final boolean retry;
63          private final long reconnectIntervalMillis;
64  
65          JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
66                  final String destinationName, final String userName, final char[] password, final boolean immediateFail,
67                  final long reconnectIntervalMillis) {
68              this.jndiProperties = jndiProperties;
69              this.connectionFactoryName = connectionFactoryName;
70              this.destinationName = destinationName;
71              this.userName = userName;
72              this.password = password;
73              this.immediateFail = immediateFail;
74              this.reconnectIntervalMillis = reconnectIntervalMillis;
75              this.retry = reconnectIntervalMillis > 0;
76          }
77  
78          public String getConnectionFactoryName() {
79              return connectionFactoryName;
80          }
81  
82          public String getDestinationName() {
83              return destinationName;
84          }
85  
86          public JndiManager getJndiManager() {
87              return JndiManager.getJndiManager(getJndiProperties());
88          }
89  
90          public Properties getJndiProperties() {
91              return jndiProperties;
92          }
93  
94          public char[] getPassword() {
95              return password;
96          }
97  
98          public long getReconnectIntervalMillis() {
99              return reconnectIntervalMillis;
100         }
101 
102         public String getUserName() {
103             return userName;
104         }
105 
106         public boolean isImmediateFail() {
107             return immediateFail;
108         }
109 
110         public boolean isRetry() {
111             return retry;
112         }
113 
114         @Override
115         public String toString() {
116             return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117                     + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118                     + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119                     + reconnectIntervalMillis + "]";
120         }
121 
122     }
123 
124     private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125 
126         @Override
127         public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128             try {
129                 return new JmsManager(name, data);
130             } catch (final Exception e) {
131                 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
132                 return null;
133             }
134         }
135     }
136 
137     /**
138      * Handles reconnecting to JMS on a Thread.
139      */
140     private class Reconnector extends Log4jThread {
141 
142         private final CountDownLatch latch = new CountDownLatch(1);
143 
144         private volatile boolean shutdown = false;
145 
146         private final Object owner;
147 
148         private Reconnector(final Object owner) {
149             super("JmsManager-Reconnector");
150             this.owner = owner;
151         }
152 
153         public void latch() {
154             try {
155                 latch.await();
156             } catch (final InterruptedException ex) {
157                 // Ignore the exception.
158             }
159         }
160 
161         void reconnect() throws NamingException, JMSException {
162             final JndiManager jndiManager2 = getJndiManager();
163             final Connection connection2 = createConnection(jndiManager2);
164             final Session session2 = createSession(connection2);
165             final Destination destination2 = createDestination(jndiManager2);
166             final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
167             connection2.start();
168             synchronized (owner) {
169                 jndiManager = jndiManager2;
170                 connection = connection2;
171                 session = session2;
172                 destination = destination2;
173                 messageProducer = messageProducer2;
174                 reconnector = null;
175                 shutdown = true;
176             }
177             logger().debug("Connection reestablished to {}", configuration);
178         }
179 
180         @Override
181         public void run() {
182             while (!shutdown) {
183                 try {
184                     sleep(configuration.getReconnectIntervalMillis());
185                     reconnect();
186                 } catch (final InterruptedException | JMSException | NamingException e) {
187                     logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
188                             e);
189                 } finally {
190                     latch.countDown();
191                 }
192             }
193         }
194 
195         public void shutdown() {
196             shutdown = true;
197         }
198 
199     }
200 
201     static final JmsManagerFactory FACTORY = new JmsManagerFactory();
202 
203     /**
204      * Gets a JmsManager using the specified configuration parameters.
205      *
206      * @param name
207      *            The name to use for this JmsManager.
208      * @param connectionFactoryName
209      *            The binding name for the {@link javax.jms.ConnectionFactory}.
210      * @param destinationName
211      *            The binding name for the {@link javax.jms.Destination}.
212      * @param userName
213      *            The userName to connect with or {@code null} for no authentication.
214      * @param password
215      *            The password to use with the given userName or {@code null} for no authentication.
216      * @param immediateFail
217      *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
218      *            fails.
219      * @param reconnectIntervalMillis
220      *            How to log sleep in milliseconds before trying to reconnect to JMS.
221      * @param jndiProperties
222      *            JNDI properties.
223      * @return The JmsManager as configured.
224      */
225     public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
226             final String connectionFactoryName, final String destinationName, final String userName,
227             final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
228         final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
229                 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
230         return getManager(name, FACTORY, configuration);
231     }
232 
233     private final JmsManagerConfiguration configuration;
234 
235     private volatile Reconnector reconnector;
236     private volatile JndiManager jndiManager;
237     private volatile Connection connection;
238     private volatile Session session;
239     private volatile Destination destination;
240     private volatile MessageProducer messageProducer;
241 
242     private JmsManager(final String name, final JmsManagerConfiguration configuration) {
243         super(null, name);
244         this.configuration = configuration;
245         this.jndiManager = configuration.getJndiManager();
246         try {
247             this.connection = createConnection(this.jndiManager);
248             this.session = createSession(this.connection);
249             this.destination = createDestination(this.jndiManager);
250             this.messageProducer = createMessageProducer(this.session, this.destination);
251             this.connection.start();
252         } catch (NamingException | JMSException e) {
253             this.reconnector = createReconnector();
254             this.reconnector.start();
255         }
256     }
257 
258     private boolean closeConnection() {
259         if (connection == null) {
260             return true;
261         }
262         final Connection temp = connection;
263         connection = null;
264         try {
265             temp.close();
266             return true;
267         } catch (final JMSException e) {
268             StatusLogger.getLogger().debug(
269                     "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
270                     e.getLocalizedMessage(), temp, e);
271             return false;
272         }
273     }
274 
275     private boolean closeJndiManager() {
276         if (jndiManager == null) {
277             return true;
278         }
279         final JndiManager tmp = jndiManager;
280         jndiManager = null;
281         tmp.close();
282         return true;
283     }
284 
285     private boolean closeMessageProducer() {
286         if (messageProducer == null) {
287             return true;
288         }
289         final MessageProducer temp = messageProducer;
290         messageProducer = null;
291         try {
292             temp.close();
293             return true;
294         } catch (final JMSException e) {
295             StatusLogger.getLogger().debug(
296                     "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
297                     e.getLocalizedMessage(), temp, e);
298             return false;
299         }
300     }
301 
302     private boolean closeSession() {
303         if (session == null) {
304             return true;
305         }
306         final Session temp = session;
307         session = null;
308         try {
309             temp.close();
310             return true;
311         } catch (final JMSException e) {
312             StatusLogger.getLogger().debug(
313                     "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
314                     e.getLocalizedMessage(), temp, e);
315             return false;
316         }
317     }
318 
319     private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
320         final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
321         if (configuration.getUserName() != null && configuration.getPassword() != null) {
322             return connectionFactory.createConnection(configuration.getUserName(),
323                     configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
324         }
325         return connectionFactory.createConnection();
326 
327     }
328 
329     private Destination createDestination(final JndiManager jndiManager) throws NamingException {
330         return jndiManager.lookup(configuration.getDestinationName());
331     }
332 
333     /**
334      * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
335      * <p>
336      * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
337      * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
338      * message will be serialized to a String.
339      * </p>
340      * <p>
341      * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
342      * will be serialized as a Java object.
343      * </p>
344      * <p>
345      * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
346      * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
347      * </p>
348      *
349      * @param object
350      *            The LogEvent or String message to wrap.
351      * @return A new JMS message containing the provided object.
352      * @throws JMSException
353      */
354     public Message createMessage(final Serializable object) throws JMSException {
355         if (object instanceof String) {
356             return this.session.createTextMessage((String) object);
357         } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
358             return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
359         }
360         return this.session.createObjectMessage(object);
361     }
362 
363     private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
364         final Message message = createMessage(serializable);
365         message.setJMSTimestamp(event.getTimeMillis());
366         messageProducer.send(message);
367     }
368 
369     /**
370      * Creates a MessageConsumer on this Destination using the current Session.
371      *
372      * @return A MessageConsumer on this Destination.
373      * @throws JMSException
374      */
375     public MessageConsumer createMessageConsumer() throws JMSException {
376         return this.session.createConsumer(this.destination);
377     }
378 
379     /**
380      * Creates a MessageProducer on this Destination using the current Session.
381      *
382      * @param session
383      *            The JMS Session to use to create the MessageProducer
384      * @param destination
385      *            The JMS Destination for the MessageProducer
386      * @return A MessageProducer on this Destination.
387      * @throws JMSException
388      */
389     public MessageProducer createMessageProducer(final Session session, final Destination destination)
390             throws JMSException {
391         return session.createProducer(destination);
392     }
393 
394     private Reconnector createReconnector() {
395         final Reconnector recon = new Reconnector(this);
396         recon.setDaemon(true);
397         recon.setPriority(Thread.MIN_PRIORITY);
398         return recon;
399     }
400 
401     private Session createSession(final Connection connection) throws JMSException {
402         return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
403     }
404 
405     public JmsManagerConfiguration getJmsManagerConfiguration() {
406         return configuration;
407     }
408 
409     JndiManager getJndiManager() {
410         return configuration.getJndiManager();
411     }
412 
413     <T> T lookup(final String destinationName) throws NamingException {
414         return this.jndiManager.lookup(destinationName);
415     }
416 
417     private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
418             final MapMessage jmsMapMessage) {
419         // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
420         log4jMapMessage.forEach(new BiConsumer<String, Object>() {
421             @Override
422             public void accept(final String key, final Object value) {
423                 try {
424                     jmsMapMessage.setObject(key, value);
425                 } catch (final JMSException e) {
426                     throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
427                             e.getClass(), key, value, e.getLocalizedMessage()), e);
428                 }
429             }
430         });
431         return jmsMapMessage;
432     }
433 
434     @Override
435     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
436         if (reconnector != null) {
437             reconnector.shutdown();
438             reconnector.interrupt();
439             reconnector = null;
440         }
441         boolean closed = false;
442         closed &= closeJndiManager();
443         closed &= closeMessageProducer();
444         closed &= closeSession();
445         closed &= closeConnection();
446         return closed && this.jndiManager.stop(timeout, timeUnit);
447     }
448 
449     void send(final LogEvent event, final Serializable serializable) {
450         if (messageProducer == null) {
451             if (reconnector != null && !configuration.isImmediateFail()) {
452                 reconnector.latch();
453                 if (messageProducer == null) {
454                     throw new AppenderLoggingException(
455                             "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
456                 }
457             }
458         }
459         synchronized (this) {
460             try {
461                 createMessageAndSend(event, serializable);
462             } catch (final JMSException causeEx) {
463                 if (configuration.isRetry() && reconnector == null) {
464                     reconnector = createReconnector();
465                     try {
466                         closeJndiManager();
467                         reconnector.reconnect();
468                     } catch (NamingException | JMSException reconnEx) {
469                         logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
470                                 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
471                         reconnector.start();
472                         throw new AppenderLoggingException(
473                                 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
474                     }
475                     try {
476                         createMessageAndSend(event, serializable);
477                     } catch (final JMSException e) {
478                         throw new AppenderLoggingException(
479                                 String.format("Error sending to %s after reestablishing JMS connection for %s",
480                                         getName(), configuration),
481                                 causeEx);
482                     }
483                 }
484             }
485         }
486     }
487 
488 }