View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom;
19  
20  import java.io.Serializable;
21  import java.util.Properties;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.TimeUnit;
24  
25  import javax.jms.Connection;
26  import javax.jms.ConnectionFactory;
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import javax.jms.MapMessage;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageProducer;
33  import javax.jms.Session;
34  import javax.naming.NamingException;
35  
36  import org.apache.logging.log4j.Logger;
37  import org.apache.logging.log4j.core.LogEvent;
38  import org.apache.logging.log4j.core.appender.AbstractManager;
39  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
40  import org.apache.logging.log4j.core.appender.ManagerFactory;
41  import org.apache.logging.log4j.core.net.JndiManager;
42  import org.apache.logging.log4j.core.util.Log4jThread;
43  import org.apache.logging.log4j.status.StatusLogger;
44  import org.apache.logging.log4j.util.BiConsumer;
45  
46  /**
47   * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
48   *
49   * <p>
50   * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
51   * involving a configured ConnectionFactory and Destination.
52   * </p>
53   */
54  public class JmsManager extends AbstractManager {
55  
56      public static class JmsManagerConfiguration {
57          private final Properties jndiProperties;
58          private final String connectionFactoryName;
59          private final String destinationName;
60          private final String userName;
61          private final char[] password;
62          private final boolean immediateFail;
63          private final boolean retry;
64          private final long reconnectIntervalMillis;
65  
66          JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
67                  final String destinationName, final String userName, final char[] password, final boolean immediateFail,
68                  final long reconnectIntervalMillis) {
69              this.jndiProperties = jndiProperties;
70              this.connectionFactoryName = connectionFactoryName;
71              this.destinationName = destinationName;
72              this.userName = userName;
73              this.password = password;
74              this.immediateFail = immediateFail;
75              this.reconnectIntervalMillis = reconnectIntervalMillis;
76              this.retry = reconnectIntervalMillis > 0;
77          }
78  
79          public String getConnectionFactoryName() {
80              return connectionFactoryName;
81          }
82  
83          public String getDestinationName() {
84              return destinationName;
85          }
86  
87          public JndiManager getJndiManager() {
88              return JndiManager.getJndiManager(getJndiProperties());
89          }
90  
91          public Properties getJndiProperties() {
92              return jndiProperties;
93          }
94  
95          public char[] getPassword() {
96              return password;
97          }
98  
99          public long getReconnectIntervalMillis() {
100             return reconnectIntervalMillis;
101         }
102 
103         public String getUserName() {
104             return userName;
105         }
106 
107         public boolean isImmediateFail() {
108             return immediateFail;
109         }
110 
111         public boolean isRetry() {
112             return retry;
113         }
114 
115         @Override
116         public String toString() {
117             return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
118                     + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
119                     + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
120                     + reconnectIntervalMillis + "]";
121         }
122 
123     }
124 
125     private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
126 
127         @Override
128         public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
129             try {
130                 return new JmsManager(name, data);
131             } catch (final Exception e) {
132                 LOGGER.error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
133                 return null;
134             }
135         }
136     }
137 
138     /**
139      * Handles reconnecting to a Socket on a Thread.
140      */
141     private class Reconnector extends Log4jThread {
142 
143         private final CountDownLatch latch = new CountDownLatch(1);
144 
145         private volatile boolean shutdown = false;
146 
147         private final Object owner;
148 
149         public Reconnector(final Object owner) {
150             super("JmsManager-Reconnector");
151             this.owner = owner;
152         }
153 
154         public void latch() {
155             try {
156                 latch.await();
157             } catch (final InterruptedException ex) {
158                 // Ignore the exception.
159             }
160         }
161 
162         void reconnect() throws NamingException, JMSException {
163             final JndiManager jndiManager2 = getJndiManager();
164             final Connection connection2 = createConnection(jndiManager2);
165             final Session session2 = createSession(connection2);
166             final Destination destination2 = createDestination(jndiManager2);
167             final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
168             connection2.start();
169             synchronized (owner) {
170                 jndiManager = jndiManager2;
171                 connection = connection2;
172                 session = session2;
173                 destination = destination2;
174                 messageProducer = messageProducer2;
175                 reconnector = null;
176                 shutdown = true;
177             }
178             LOGGER.debug("Connection reestablished to {}", configuration);
179         }
180 
181         @Override
182         public void run() {
183             while (!shutdown) {
184                 try {
185                     sleep(configuration.getReconnectIntervalMillis());
186                     reconnect();
187                 } catch (final InterruptedException | JMSException | NamingException e) {
188                     LOGGER.debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
189                             e);
190                 } finally {
191                     latch.countDown();
192                 }
193             }
194         }
195 
196         public void shutdown() {
197             shutdown = true;
198         }
199 
200     }
201 
202     private static final Logger LOGGER = StatusLogger.getLogger();
203 
204     static final JmsManagerFactory FACTORY = new JmsManagerFactory();
205 
206     /**
207      * Gets a JmsManager using the specified configuration parameters.
208      *
209      * @param name
210      *            The name to use for this JmsManager.
211      * @param connectionFactoryName
212      *            The binding name for the {@link javax.jms.ConnectionFactory}.
213      * @param destinationName
214      *            The binding name for the {@link javax.jms.Destination}.
215      * @param userName
216      *            The userName to connect with or {@code null} for no authentication.
217      * @param password
218      *            The password to use with the given userName or {@code null} for no authentication.
219      * @param immediateFail
220      *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
221      *            fails.
222      * @param reconnectIntervalMillis
223      *            How to log sleep in milliseconds before trying to reconnect to JMS.
224      * @param jndiManager
225      *            The JndiManager to look up JMS information through.
226      * @return The JmsManager as configured.
227      */
228     public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
229             final String connectionFactoryName, final String destinationName, final String userName,
230             final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
231         final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
232                 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
233         return getManager(name, FACTORY, configuration);
234     }
235 
236     private final JmsManagerConfiguration configuration;
237 
238     private volatile Reconnector reconnector;
239     private volatile JndiManager jndiManager;
240     private volatile Connection connection;
241     private volatile Session session;
242     private volatile Destination destination;
243     private volatile MessageProducer messageProducer;
244 
245     private JmsManager(final String name, final JmsManagerConfiguration configuration) {
246         super(null, name);
247         this.configuration = configuration;
248         this.jndiManager = configuration.getJndiManager();
249         try {
250             this.connection = createConnection(this.jndiManager);
251             this.session = createSession(this.connection);
252             this.destination = createDestination(this.jndiManager);
253             this.messageProducer = createMessageProducer(this.session, this.destination);
254             this.connection.start();
255         } catch (NamingException | JMSException e) {
256             this.reconnector = createReconnector();
257             this.reconnector.start();
258         }
259     }
260 
261     private boolean closeConnection() {
262         if (connection == null) {
263             return true;
264         }
265         final Connection temp = connection;
266         connection = null;
267         try {
268             temp.close();
269             return true;
270         } catch (final JMSException e) {
271             StatusLogger.getLogger().debug(
272                     "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
273                     e.getLocalizedMessage(), temp, e);
274             return false;
275         }
276     }
277 
278     private boolean closeJndiManager() {
279         if (jndiManager == null) {
280             return true;
281         }
282         final JndiManager tmp = jndiManager;
283         jndiManager = null;
284         tmp.close();
285         return true;
286     }
287 
288     private boolean closeMessageProducer() {
289         if (messageProducer == null) {
290             return true;
291         }
292         final MessageProducer temp = messageProducer;
293         messageProducer = null;
294         try {
295             temp.close();
296             return true;
297         } catch (final JMSException e) {
298             StatusLogger.getLogger().debug(
299                     "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
300                     e.getLocalizedMessage(), temp, e);
301             return false;
302         }
303     }
304 
305     private boolean closeSession() {
306         if (session == null) {
307             return true;
308         }
309         final Session temp = session;
310         session = null;
311         try {
312             temp.close();
313             return true;
314         } catch (final JMSException e) {
315             StatusLogger.getLogger().debug(
316                     "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
317                     e.getLocalizedMessage(), temp, e);
318             return false;
319         }
320     }
321 
322     private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
323         final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
324         if (configuration.getUserName() != null && configuration.getPassword() != null) {
325             return connectionFactory.createConnection(configuration.getUserName(),
326                     configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
327         }
328         return connectionFactory.createConnection();
329 
330     }
331 
332     private Destination createDestination(final JndiManager jndiManager) throws NamingException {
333         return jndiManager.lookup(configuration.getDestinationName());
334     }
335 
336     /**
337      * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
338      * <p>
339      * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
340      * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
341      * message will be serialized to a String.
342      * </p>
343      * <p>
344      * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
345      * will be serialized as a Java object.
346      * </p>
347      * <p>
348      * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
349      * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
350      * </p>
351      *
352      * @param object
353      *            The LogEvent or String message to wrap.
354      * @return A new JMS message containing the provided object.
355      * @throws JMSException
356      */
357     public Message createMessage(final Serializable object) throws JMSException {
358         if (object instanceof String) {
359             return this.session.createTextMessage((String) object);
360         } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
361             return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
362         }
363         return this.session.createObjectMessage(object);
364     }
365 
366     private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
367         final Message message = createMessage(serializable);
368         message.setJMSTimestamp(event.getTimeMillis());
369         messageProducer.send(message);
370     }
371 
372     /**
373      * Creates a MessageConsumer on this Destination using the current Session.
374      *
375      * @return A MessageConsumer on this Destination.
376      * @throws JMSException
377      */
378     public MessageConsumer createMessageConsumer() throws JMSException {
379         return this.session.createConsumer(this.destination);
380     }
381 
382     /**
383      * Creates a MessageProducer on this Destination using the current Session.
384      *
385      * @param session
386      *            The JMS Session to use to create the MessageProducer
387      * @param destination
388      *            The JMS Destination for the MessageProducer
389      * @return A MessageProducer on this Destination.
390      * @throws JMSException
391      */
392     public MessageProducer createMessageProducer(final Session session, final Destination destination)
393             throws JMSException {
394         return session.createProducer(destination);
395     }
396 
397     private Reconnector createReconnector() {
398         final Reconnector recon = new Reconnector(this);
399         recon.setDaemon(true);
400         recon.setPriority(Thread.MIN_PRIORITY);
401         return recon;
402     }
403 
404     private Session createSession(final Connection connection) throws JMSException {
405         return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
406     }
407 
408     public JmsManagerConfiguration getJmsManagerConfiguration() {
409         return configuration;
410     }
411 
412     JndiManager getJndiManager() {
413         return configuration.getJndiManager();
414     }
415 
416     <T> T lookup(final String destinationName) throws NamingException {
417         return this.jndiManager.lookup(destinationName);
418     }
419 
420     private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
421             final MapMessage jmsMapMessage) {
422         // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
423         log4jMapMessage.forEach(new BiConsumer<String, Object>() {
424             @Override
425             public void accept(final String key, final Object value) {
426                 try {
427                     jmsMapMessage.setObject(key, value);
428                 } catch (final JMSException e) {
429                     throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
430                             e.getClass(), key, value, e.getLocalizedMessage()), e);
431                 }
432             }
433         });
434         return jmsMapMessage;
435     }
436 
437     @Override
438     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
439         if (reconnector != null) {
440             reconnector.shutdown();
441             reconnector.interrupt();
442             reconnector = null;
443         }
444         boolean closed = false;
445         closed &= closeJndiManager();
446         closed &= closeMessageProducer();
447         closed &= closeSession();
448         closed &= closeConnection();
449         return closed && this.jndiManager.stop(timeout, timeUnit);
450     }
451 
452     void send(final LogEvent event, final Serializable serializable) {
453         if (messageProducer == null) {
454             if (reconnector != null && !configuration.isImmediateFail()) {
455                 reconnector.latch();
456             }
457             if (messageProducer == null) {
458                 throw new AppenderLoggingException(
459                         "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
460             }
461         }
462         synchronized (this) {
463             try {
464                 createMessageAndSend(event, serializable);
465             } catch (final JMSException causeEx) {
466                 if (configuration.isRetry() && reconnector == null) {
467                     reconnector = createReconnector();
468                     try {
469                         closeJndiManager();
470                         reconnector.reconnect();
471                     } catch (NamingException | JMSException reconnEx) {
472                         LOGGER.debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
473                                 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
474                         reconnector.start();
475                         throw new AppenderLoggingException(
476                                 String.format("Error sending to %s for %s", getName(), configuration), causeEx);
477                     }
478                     try {
479                         createMessageAndSend(event, serializable);
480                     } catch (final JMSException e) {
481                         throw new AppenderLoggingException(
482                                 String.format("Error sending to %s after reestablishing connection for %s", getName(),
483                                         configuration),
484                                 causeEx);
485                     }
486                 }
487             }
488         }
489     }
490 
491 }