View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom;
19  
20  import java.io.Serializable;
21  import java.util.Arrays;
22  import java.util.Properties;
23  import java.util.concurrent.CountDownLatch;
24  import java.util.concurrent.TimeUnit;
25  
26  import javax.jms.Connection;
27  import javax.jms.ConnectionFactory;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.MapMessage;
31  import javax.jms.Message;
32  import javax.jms.MessageConsumer;
33  import javax.jms.MessageProducer;
34  import javax.jms.Session;
35  import javax.naming.NamingException;
36  
37  import org.apache.logging.log4j.Logger;
38  import org.apache.logging.log4j.core.LogEvent;
39  import org.apache.logging.log4j.core.appender.AbstractManager;
40  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
41  import org.apache.logging.log4j.core.appender.ManagerFactory;
42  import org.apache.logging.log4j.core.net.JndiManager;
43  import org.apache.logging.log4j.core.util.Log4jThread;
44  import org.apache.logging.log4j.status.StatusLogger;
45  import org.apache.logging.log4j.util.BiConsumer;
46  
47  /**
48   * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
49   *
50   * <p>
51   * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
52   * involving a configured ConnectionFactory and Destination.
53   * </p>
54   */
55  public class JmsManager extends AbstractManager {
56  
57      public static class JmsManagerConfiguration {
58          private final Properties jndiProperties;
59          private final String connectionFactoryName;
60          private final String destinationName;
61          private final String userName;
62          private final char[] password;
63          private final boolean immediateFail;
64          private final boolean retry;
65          private final long reconnectIntervalMillis;
66  
67          JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
68                  final String destinationName, final String userName, final char[] password, final boolean immediateFail,
69                  final long reconnectIntervalMillis) {
70              this.jndiProperties = jndiProperties;
71              this.connectionFactoryName = connectionFactoryName;
72              this.destinationName = destinationName;
73              this.userName = userName;
74              this.password = password;
75              this.immediateFail = immediateFail;
76              this.reconnectIntervalMillis = reconnectIntervalMillis;
77              this.retry = reconnectIntervalMillis > 0;
78          }
79  
80          public String getConnectionFactoryName() {
81              return connectionFactoryName;
82          }
83  
84          public String getDestinationName() {
85              return destinationName;
86          }
87  
88          public JndiManager getJndiManager() {
89              return JndiManager.getJndiManager(getJndiProperties());
90          }
91  
92          public Properties getJndiProperties() {
93              return jndiProperties;
94          }
95  
96          public char[] getPassword() {
97              return password;
98          }
99  
100         public long getReconnectIntervalMillis() {
101             return reconnectIntervalMillis;
102         }
103 
104         public String getUserName() {
105             return userName;
106         }
107 
108         public boolean isImmediateFail() {
109             return immediateFail;
110         }
111 
112         public boolean isRetry() {
113             return retry;
114         }
115 
116         @Override
117         public String toString() {
118             return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
119                     + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
120                     + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
121                     + reconnectIntervalMillis + "]";
122         }
123 
124     }
125 
126     private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
127 
128         @Override
129         public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
130             try {
131                 return new JmsManager(name, data);
132             } catch (final Exception e) {
133                 LOGGER.error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
134                 return null;
135             }
136         }
137     }
138 
139     /**
140      * Handles reconnecting to a Socket on a Thread.
141      */
142     private class Reconnector extends Log4jThread {
143 
144         private final CountDownLatch latch = new CountDownLatch(1);
145 
146         private volatile boolean shutdown = false;
147 
148         private final Object owner;
149 
150         public Reconnector(final Object owner) {
151             super("JmsManager-Reconnector");
152             this.owner = owner;
153         }
154 
155         public void latch() {
156             try {
157                 latch.await();
158             } catch (final InterruptedException ex) {
159                 // Ignore the exception.
160             }
161         }
162 
163         void reconnect() throws NamingException, JMSException {
164             final JndiManager jndiManager2 = getJndiManager();
165             final Connection connection2 = createConnection(jndiManager2);
166             final Session session2 = createSession(connection2);
167             final Destination destination2 = createDestination(jndiManager2);
168             final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
169             connection2.start();
170             synchronized (owner) {
171                 jndiManager = jndiManager2;
172                 connection = connection2;
173                 session = session2;
174                 destination = destination2;
175                 messageProducer = messageProducer2;
176                 reconnector = null;
177                 shutdown = true;
178             }
179             LOGGER.debug("Connection reestablished to {}", configuration);
180         }
181 
182         @Override
183         public void run() {
184             while (!shutdown) {
185                 try {
186                     sleep(configuration.getReconnectIntervalMillis());
187                     reconnect();
188                 } catch (final InterruptedException | JMSException | NamingException e) {
189                     LOGGER.debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
190                             e);
191                 } finally {
192                     latch.countDown();
193                 }
194             }
195         }
196 
197         public void shutdown() {
198             shutdown = true;
199         }
200 
201     }
202 
203     private static final Logger LOGGER = StatusLogger.getLogger();
204 
205     static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206 
207     /**
208      * Gets a JmsManager using the specified configuration parameters.
209      *
210      * @param name
211      *            The name to use for this JmsManager.
212      * @param connectionFactoryName
213      *            The binding name for the {@link javax.jms.ConnectionFactory}.
214      * @param destinationName
215      *            The binding name for the {@link javax.jms.Destination}.
216      * @param userName
217      *            The userName to connect with or {@code null} for no authentication.
218      * @param password
219      *            The password to use with the given userName or {@code null} for no authentication.
220      * @param immediateFail
221      *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
222      *            fails.
223      * @param reconnectIntervalMillis
224      *            How to log sleep in milliseconds before trying to reconnect to JMS.
225      * @param jndiManager
226      *            The JndiManager to look up JMS information through.
227      * @return The JmsManager as configured.
228      */
229     public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230             final String connectionFactoryName, final String destinationName, final String userName,
231             final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232         final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233                 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234         return getManager(name, FACTORY, configuration);
235     }
236 
237     private final JmsManagerConfiguration configuration;
238 
239     private volatile Reconnector reconnector;
240     private volatile JndiManager jndiManager;
241     private volatile Connection connection;
242     private volatile Session session;
243     private volatile Destination destination;
244     private volatile MessageProducer messageProducer;
245 
246     private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247         super(null, name);
248         this.configuration = configuration;
249         this.jndiManager = configuration.getJndiManager();
250         try {
251             this.connection = createConnection(this.jndiManager);
252             this.session = createSession(this.connection);
253             this.destination = createDestination(this.jndiManager);
254             this.messageProducer = createMessageProducer(this.session, this.destination);
255             this.connection.start();
256         } catch (NamingException | JMSException e) {
257             this.reconnector = createReconnector();
258             this.reconnector.start();
259         }
260     }
261 
262     private boolean closeConnection() {
263         if (connection == null) {
264             return true;
265         }
266         final Connection temp = connection;
267         connection = null;
268         try {
269             temp.close();
270             return true;
271         } catch (final JMSException e) {
272             StatusLogger.getLogger().debug(
273                     "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274                     e.getLocalizedMessage(), temp, e);
275             return false;
276         }
277     }
278 
279     private boolean closeJndiManager() {
280         if (jndiManager == null) {
281             return true;
282         }
283         final JndiManager tmp = jndiManager;
284         jndiManager = null;
285         tmp.close();
286         return true;
287     }
288 
289     private boolean closeMessageProducer() {
290         if (messageProducer == null) {
291             return true;
292         }
293         final MessageProducer temp = messageProducer;
294         messageProducer = null;
295         try {
296             temp.close();
297             return true;
298         } catch (final JMSException e) {
299             StatusLogger.getLogger().debug(
300                     "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301                     e.getLocalizedMessage(), temp, e);
302             return false;
303         }
304     }
305 
306     private boolean closeSession() {
307         if (session == null) {
308             return true;
309         }
310         final Session temp = session;
311         session = null;
312         try {
313             temp.close();
314             return true;
315         } catch (final JMSException e) {
316             StatusLogger.getLogger().debug(
317                     "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318                     e.getLocalizedMessage(), temp, e);
319             return false;
320         }
321     }
322 
323     private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324         final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325         if (configuration.getUserName() != null && configuration.getPassword() != null) {
326             return connectionFactory.createConnection(configuration.getUserName(),
327                     configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328         }
329         return connectionFactory.createConnection();
330 
331     }
332 
333     private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334         return jndiManager.lookup(configuration.getDestinationName());
335     }
336 
337     /**
338      * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
339      * <p>
340      * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
341      * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
342      * message will be serialized to a String.
343      * </p>
344      * <p>
345      * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
346      * will be serialized as a Java object.
347      * </p>
348      * <p>
349      * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
350      * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
351      * </p>
352      *
353      * @param object
354      *            The LogEvent or String message to wrap.
355      * @return A new JMS message containing the provided object.
356      * @throws JMSException
357      */
358     public Message createMessage(final Serializable object) throws JMSException {
359         if (object instanceof String) {
360             return this.session.createTextMessage((String) object);
361         } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362             return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363         }
364         return this.session.createObjectMessage(object);
365     }
366 
367     private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368         final Message message = createMessage(serializable);
369         message.setJMSTimestamp(event.getTimeMillis());
370         messageProducer.send(message);
371     }
372 
373     /**
374      * Creates a MessageConsumer on this Destination using the current Session.
375      *
376      * @return A MessageConsumer on this Destination.
377      * @throws JMSException
378      */
379     public MessageConsumer createMessageConsumer() throws JMSException {
380         return this.session.createConsumer(this.destination);
381     }
382 
383     /**
384      * Creates a MessageProducer on this Destination using the current Session.
385      *
386      * @param session
387      *            The JMS Session to use to create the MessageProducer
388      * @param destination
389      *            The JMS Destination for the MessageProducer
390      * @return A MessageProducer on this Destination.
391      * @throws JMSException
392      */
393     public MessageProducer createMessageProducer(final Session session, final Destination destination)
394             throws JMSException {
395         return session.createProducer(destination);
396     }
397 
398     private Reconnector createReconnector() {
399         final Reconnector recon = new Reconnector(this);
400         recon.setDaemon(true);
401         recon.setPriority(Thread.MIN_PRIORITY);
402         return recon;
403     }
404 
405     private Session createSession(final Connection connection) throws JMSException {
406         return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407     }
408 
409     public JmsManagerConfiguration getJmsManagerConfiguration() {
410         return configuration;
411     }
412 
413     JndiManager getJndiManager() {
414         return configuration.getJndiManager();
415     }
416 
417     <T> T lookup(final String destinationName) throws NamingException {
418         return this.jndiManager.lookup(destinationName);
419     }
420 
421     private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422             final MapMessage jmsMapMessage) {
423         // Map without calling rg.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
424         log4jMapMessage.forEach(new BiConsumer<String, Object>() {
425             @Override
426             public void accept(final String key, final Object value) {
427                 try {
428                     jmsMapMessage.setObject(key, value);
429                 } catch (final JMSException e) {
430                     throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
431                             e.getClass(), key, value, e.getLocalizedMessage()), e);
432                 }
433             }
434         });
435         return jmsMapMessage;
436     }
437 
438     @Override
439     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
440         if (reconnector != null) {
441             reconnector.shutdown();
442             reconnector.interrupt();
443             reconnector = null;
444         }
445         boolean closed = false;
446         closed &= closeJndiManager();
447         closed &= closeMessageProducer();
448         closed &= closeSession();
449         closed &= closeConnection();
450         return closed && this.jndiManager.stop(timeout, timeUnit);
451     }
452 
453     void send(final LogEvent event, final Serializable serializable) {
454         if (messageProducer == null) {
455             if (reconnector != null && !configuration.isImmediateFail()) {
456                 reconnector.latch();
457             }
458             if (messageProducer == null) {
459                 throw new AppenderLoggingException(
460                         "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
461             }
462         }
463         synchronized (this) {
464             try {
465                 createMessageAndSend(event, serializable);
466             } catch (final JMSException causeEx) {
467                 if (configuration.isRetry() && reconnector == null) {
468                     reconnector = createReconnector();
469                     try {
470                         closeJndiManager();
471                         reconnector.reconnect();
472                     } catch (NamingException | JMSException reconnEx) {
473                         LOGGER.debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
474                                 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
475                         reconnector.start();
476                         throw new AppenderLoggingException(
477                                 String.format("Error sending to %s for %s", getName(), configuration), causeEx);
478                     }
479                     try {
480                         createMessageAndSend(event, serializable);
481                     } catch (final JMSException e) {
482                         throw new AppenderLoggingException(
483                                 String.format("Error sending to %s after reestablishing connection for %s", getName(),
484                                         configuration),
485                                 causeEx);
486                     }
487                 }
488             }
489         }
490     }
491 
492 }