1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom;
19
20 import java.io.Serializable;
21 import java.util.Arrays;
22 import java.util.Properties;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25
26 import javax.jms.Connection;
27 import javax.jms.ConnectionFactory;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.MapMessage;
31 import javax.jms.Message;
32 import javax.jms.MessageConsumer;
33 import javax.jms.MessageProducer;
34 import javax.jms.Session;
35 import javax.naming.NamingException;
36
37 import org.apache.logging.log4j.Logger;
38 import org.apache.logging.log4j.core.LogEvent;
39 import org.apache.logging.log4j.core.appender.AbstractManager;
40 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
41 import org.apache.logging.log4j.core.appender.ManagerFactory;
42 import org.apache.logging.log4j.core.net.JndiManager;
43 import org.apache.logging.log4j.core.util.Log4jThread;
44 import org.apache.logging.log4j.status.StatusLogger;
45 import org.apache.logging.log4j.util.BiConsumer;
46
47
48
49
50
51
52
53
54
55 public class JmsManager extends AbstractManager {
56
57 public static class JmsManagerConfiguration {
58 private final Properties jndiProperties;
59 private final String connectionFactoryName;
60 private final String destinationName;
61 private final String userName;
62 private final char[] password;
63 private final boolean immediateFail;
64 private final boolean retry;
65 private final long reconnectIntervalMillis;
66
67 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
68 final String destinationName, final String userName, final char[] password, final boolean immediateFail,
69 final long reconnectIntervalMillis) {
70 this.jndiProperties = jndiProperties;
71 this.connectionFactoryName = connectionFactoryName;
72 this.destinationName = destinationName;
73 this.userName = userName;
74 this.password = password;
75 this.immediateFail = immediateFail;
76 this.reconnectIntervalMillis = reconnectIntervalMillis;
77 this.retry = reconnectIntervalMillis > 0;
78 }
79
80 public String getConnectionFactoryName() {
81 return connectionFactoryName;
82 }
83
84 public String getDestinationName() {
85 return destinationName;
86 }
87
88 public JndiManager getJndiManager() {
89 return JndiManager.getJndiManager(getJndiProperties());
90 }
91
92 public Properties getJndiProperties() {
93 return jndiProperties;
94 }
95
96 public char[] getPassword() {
97 return password;
98 }
99
100 public long getReconnectIntervalMillis() {
101 return reconnectIntervalMillis;
102 }
103
104 public String getUserName() {
105 return userName;
106 }
107
108 public boolean isImmediateFail() {
109 return immediateFail;
110 }
111
112 public boolean isRetry() {
113 return retry;
114 }
115
116 @Override
117 public String toString() {
118 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
119 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
120 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
121 + reconnectIntervalMillis + "]";
122 }
123
124 }
125
126 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
127
128 @Override
129 public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
130 try {
131 return new JmsManager(name, data);
132 } catch (final Exception e) {
133 LOGGER.error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
134 return null;
135 }
136 }
137 }
138
139
140
141
142 private class Reconnector extends Log4jThread {
143
144 private final CountDownLatch latch = new CountDownLatch(1);
145
146 private volatile boolean shutdown = false;
147
148 private final Object owner;
149
150 public Reconnector(final Object owner) {
151 super("JmsManager-Reconnector");
152 this.owner = owner;
153 }
154
155 public void latch() {
156 try {
157 latch.await();
158 } catch (final InterruptedException ex) {
159
160 }
161 }
162
163 void reconnect() throws NamingException, JMSException {
164 final JndiManager jndiManager2 = getJndiManager();
165 final Connection connection2 = createConnection(jndiManager2);
166 final Session session2 = createSession(connection2);
167 final Destination destination2 = createDestination(jndiManager2);
168 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
169 connection2.start();
170 synchronized (owner) {
171 jndiManager = jndiManager2;
172 connection = connection2;
173 session = session2;
174 destination = destination2;
175 messageProducer = messageProducer2;
176 reconnector = null;
177 shutdown = true;
178 }
179 LOGGER.debug("Connection reestablished to {}", configuration);
180 }
181
182 @Override
183 public void run() {
184 while (!shutdown) {
185 try {
186 sleep(configuration.getReconnectIntervalMillis());
187 reconnect();
188 } catch (final InterruptedException | JMSException | NamingException e) {
189 LOGGER.debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
190 e);
191 } finally {
192 latch.countDown();
193 }
194 }
195 }
196
197 public void shutdown() {
198 shutdown = true;
199 }
200
201 }
202
203 private static final Logger LOGGER = StatusLogger.getLogger();
204
205 static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230 final String connectionFactoryName, final String destinationName, final String userName,
231 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234 return getManager(name, FACTORY, configuration);
235 }
236
237 private final JmsManagerConfiguration configuration;
238
239 private volatile Reconnector reconnector;
240 private volatile JndiManager jndiManager;
241 private volatile Connection connection;
242 private volatile Session session;
243 private volatile Destination destination;
244 private volatile MessageProducer messageProducer;
245
246 private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247 super(null, name);
248 this.configuration = configuration;
249 this.jndiManager = configuration.getJndiManager();
250 try {
251 this.connection = createConnection(this.jndiManager);
252 this.session = createSession(this.connection);
253 this.destination = createDestination(this.jndiManager);
254 this.messageProducer = createMessageProducer(this.session, this.destination);
255 this.connection.start();
256 } catch (NamingException | JMSException e) {
257 this.reconnector = createReconnector();
258 this.reconnector.start();
259 }
260 }
261
262 private boolean closeConnection() {
263 if (connection == null) {
264 return true;
265 }
266 final Connection temp = connection;
267 connection = null;
268 try {
269 temp.close();
270 return true;
271 } catch (final JMSException e) {
272 StatusLogger.getLogger().debug(
273 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274 e.getLocalizedMessage(), temp, e);
275 return false;
276 }
277 }
278
279 private boolean closeJndiManager() {
280 if (jndiManager == null) {
281 return true;
282 }
283 final JndiManager tmp = jndiManager;
284 jndiManager = null;
285 tmp.close();
286 return true;
287 }
288
289 private boolean closeMessageProducer() {
290 if (messageProducer == null) {
291 return true;
292 }
293 final MessageProducer temp = messageProducer;
294 messageProducer = null;
295 try {
296 temp.close();
297 return true;
298 } catch (final JMSException e) {
299 StatusLogger.getLogger().debug(
300 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301 e.getLocalizedMessage(), temp, e);
302 return false;
303 }
304 }
305
306 private boolean closeSession() {
307 if (session == null) {
308 return true;
309 }
310 final Session temp = session;
311 session = null;
312 try {
313 temp.close();
314 return true;
315 } catch (final JMSException e) {
316 StatusLogger.getLogger().debug(
317 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318 e.getLocalizedMessage(), temp, e);
319 return false;
320 }
321 }
322
323 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325 if (configuration.getUserName() != null && configuration.getPassword() != null) {
326 return connectionFactory.createConnection(configuration.getUserName(),
327 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328 }
329 return connectionFactory.createConnection();
330
331 }
332
333 private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334 return jndiManager.lookup(configuration.getDestinationName());
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 public Message createMessage(final Serializable object) throws JMSException {
359 if (object instanceof String) {
360 return this.session.createTextMessage((String) object);
361 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363 }
364 return this.session.createObjectMessage(object);
365 }
366
367 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368 final Message message = createMessage(serializable);
369 message.setJMSTimestamp(event.getTimeMillis());
370 messageProducer.send(message);
371 }
372
373
374
375
376
377
378
379 public MessageConsumer createMessageConsumer() throws JMSException {
380 return this.session.createConsumer(this.destination);
381 }
382
383
384
385
386
387
388
389
390
391
392
393 public MessageProducer createMessageProducer(final Session session, final Destination destination)
394 throws JMSException {
395 return session.createProducer(destination);
396 }
397
398 private Reconnector createReconnector() {
399 final Reconnector recon = new Reconnector(this);
400 recon.setDaemon(true);
401 recon.setPriority(Thread.MIN_PRIORITY);
402 return recon;
403 }
404
405 private Session createSession(final Connection connection) throws JMSException {
406 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407 }
408
409 public JmsManagerConfiguration getJmsManagerConfiguration() {
410 return configuration;
411 }
412
413 JndiManager getJndiManager() {
414 return configuration.getJndiManager();
415 }
416
417 <T> T lookup(final String destinationName) throws NamingException {
418 return this.jndiManager.lookup(destinationName);
419 }
420
421 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422 final MapMessage jmsMapMessage) {
423
424 log4jMapMessage.forEach(new BiConsumer<String, Object>() {
425 @Override
426 public void accept(final String key, final Object value) {
427 try {
428 jmsMapMessage.setObject(key, value);
429 } catch (final JMSException e) {
430 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
431 e.getClass(), key, value, e.getLocalizedMessage()), e);
432 }
433 }
434 });
435 return jmsMapMessage;
436 }
437
438 @Override
439 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
440 if (reconnector != null) {
441 reconnector.shutdown();
442 reconnector.interrupt();
443 reconnector = null;
444 }
445 boolean closed = false;
446 closed &= closeJndiManager();
447 closed &= closeMessageProducer();
448 closed &= closeSession();
449 closed &= closeConnection();
450 return closed && this.jndiManager.stop(timeout, timeUnit);
451 }
452
453 void send(final LogEvent event, final Serializable serializable) {
454 if (messageProducer == null) {
455 if (reconnector != null && !configuration.isImmediateFail()) {
456 reconnector.latch();
457 }
458 if (messageProducer == null) {
459 throw new AppenderLoggingException(
460 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
461 }
462 }
463 synchronized (this) {
464 try {
465 createMessageAndSend(event, serializable);
466 } catch (final JMSException causeEx) {
467 if (configuration.isRetry() && reconnector == null) {
468 reconnector = createReconnector();
469 try {
470 closeJndiManager();
471 reconnector.reconnect();
472 } catch (NamingException | JMSException reconnEx) {
473 LOGGER.debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
474 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
475 reconnector.start();
476 throw new AppenderLoggingException(
477 String.format("Error sending to %s for %s", getName(), configuration), causeEx);
478 }
479 try {
480 createMessageAndSend(event, serializable);
481 } catch (final JMSException e) {
482 throw new AppenderLoggingException(
483 String.format("Error sending to %s after reestablishing connection for %s", getName(),
484 configuration),
485 causeEx);
486 }
487 }
488 }
489 }
490 }
491
492 }