1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom;
19
20 import java.io.Serializable;
21 import java.util.Properties;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.MapMessage;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.naming.NamingException;
35
36 import org.apache.logging.log4j.core.LogEvent;
37 import org.apache.logging.log4j.core.appender.AbstractManager;
38 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
39 import org.apache.logging.log4j.core.appender.ManagerFactory;
40 import org.apache.logging.log4j.core.net.JndiManager;
41 import org.apache.logging.log4j.core.util.Log4jThread;
42 import org.apache.logging.log4j.status.StatusLogger;
43 import org.apache.logging.log4j.util.BiConsumer;
44
45
46
47
48
49
50
51
52
53 public class JmsManager extends AbstractManager {
54
55 public static class JmsManagerConfiguration {
56 private final Properties jndiProperties;
57 private final String connectionFactoryName;
58 private final String destinationName;
59 private final String userName;
60 private final char[] password;
61 private final boolean immediateFail;
62 private final boolean retry;
63 private final long reconnectIntervalMillis;
64
65 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
66 final String destinationName, final String userName, final char[] password, final boolean immediateFail,
67 final long reconnectIntervalMillis) {
68 this.jndiProperties = jndiProperties;
69 this.connectionFactoryName = connectionFactoryName;
70 this.destinationName = destinationName;
71 this.userName = userName;
72 this.password = password;
73 this.immediateFail = immediateFail;
74 this.reconnectIntervalMillis = reconnectIntervalMillis;
75 this.retry = reconnectIntervalMillis > 0;
76 }
77
78 public String getConnectionFactoryName() {
79 return connectionFactoryName;
80 }
81
82 public String getDestinationName() {
83 return destinationName;
84 }
85
86 public JndiManager getJndiManager() {
87 return JndiManager.getJndiManager(getJndiProperties());
88 }
89
90 public Properties getJndiProperties() {
91 return jndiProperties;
92 }
93
94 public char[] getPassword() {
95 return password;
96 }
97
98 public long getReconnectIntervalMillis() {
99 return reconnectIntervalMillis;
100 }
101
102 public String getUserName() {
103 return userName;
104 }
105
106 public boolean isImmediateFail() {
107 return immediateFail;
108 }
109
110 public boolean isRetry() {
111 return retry;
112 }
113
114 @Override
115 public String toString() {
116 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119 + reconnectIntervalMillis + "]";
120 }
121
122 }
123
124 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125
126 @Override
127 public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128 try {
129 return new JmsManager(name, data);
130 } catch (final Exception e) {
131 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
132 return null;
133 }
134 }
135 }
136
137
138
139
140 private class Reconnector extends Log4jThread {
141
142 private final CountDownLatch latch = new CountDownLatch(1);
143
144 private volatile boolean shutdown = false;
145
146 private final Object owner;
147
148 private Reconnector(final Object owner) {
149 super("JmsManager-Reconnector");
150 this.owner = owner;
151 }
152
153 public void latch() {
154 try {
155 latch.await();
156 } catch (final InterruptedException ex) {
157
158 }
159 }
160
161 void reconnect() throws NamingException, JMSException {
162 final JndiManager jndiManager2 = getJndiManager();
163 final Connection connection2 = createConnection(jndiManager2);
164 final Session session2 = createSession(connection2);
165 final Destination destination2 = createDestination(jndiManager2);
166 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
167 connection2.start();
168 synchronized (owner) {
169 jndiManager = jndiManager2;
170 connection = connection2;
171 session = session2;
172 destination = destination2;
173 messageProducer = messageProducer2;
174 reconnector = null;
175 shutdown = true;
176 }
177 logger().debug("Connection reestablished to {}", configuration);
178 }
179
180 @Override
181 public void run() {
182 while (!shutdown) {
183 try {
184 sleep(configuration.getReconnectIntervalMillis());
185 reconnect();
186 } catch (final InterruptedException | JMSException | NamingException e) {
187 logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
188 e);
189 } finally {
190 latch.countDown();
191 }
192 }
193 }
194
195 public void shutdown() {
196 shutdown = true;
197 }
198
199 }
200
201 static final JmsManagerFactory FACTORY = new JmsManagerFactory();
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
226 final String connectionFactoryName, final String destinationName, final String userName,
227 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
228 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
229 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
230 return getManager(name, FACTORY, configuration);
231 }
232
233 private final JmsManagerConfiguration configuration;
234
235 private volatile Reconnector reconnector;
236 private volatile JndiManager jndiManager;
237 private volatile Connection connection;
238 private volatile Session session;
239 private volatile Destination destination;
240 private volatile MessageProducer messageProducer;
241
242 private JmsManager(final String name, final JmsManagerConfiguration configuration) {
243 super(null, name);
244 this.configuration = configuration;
245 this.jndiManager = configuration.getJndiManager();
246 try {
247 this.connection = createConnection(this.jndiManager);
248 this.session = createSession(this.connection);
249 this.destination = createDestination(this.jndiManager);
250 this.messageProducer = createMessageProducer(this.session, this.destination);
251 this.connection.start();
252 } catch (NamingException | JMSException e) {
253 this.reconnector = createReconnector();
254 this.reconnector.start();
255 }
256 }
257
258 private boolean closeConnection() {
259 if (connection == null) {
260 return true;
261 }
262 final Connection temp = connection;
263 connection = null;
264 try {
265 temp.close();
266 return true;
267 } catch (final JMSException e) {
268 StatusLogger.getLogger().debug(
269 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
270 e.getLocalizedMessage(), temp, e);
271 return false;
272 }
273 }
274
275 private boolean closeJndiManager() {
276 if (jndiManager == null) {
277 return true;
278 }
279 final JndiManager tmp = jndiManager;
280 jndiManager = null;
281 tmp.close();
282 return true;
283 }
284
285 private boolean closeMessageProducer() {
286 if (messageProducer == null) {
287 return true;
288 }
289 final MessageProducer temp = messageProducer;
290 messageProducer = null;
291 try {
292 temp.close();
293 return true;
294 } catch (final JMSException e) {
295 StatusLogger.getLogger().debug(
296 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
297 e.getLocalizedMessage(), temp, e);
298 return false;
299 }
300 }
301
302 private boolean closeSession() {
303 if (session == null) {
304 return true;
305 }
306 final Session temp = session;
307 session = null;
308 try {
309 temp.close();
310 return true;
311 } catch (final JMSException e) {
312 StatusLogger.getLogger().debug(
313 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
314 e.getLocalizedMessage(), temp, e);
315 return false;
316 }
317 }
318
319 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
320 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
321 if (configuration.getUserName() != null && configuration.getPassword() != null) {
322 return connectionFactory.createConnection(configuration.getUserName(),
323 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
324 }
325 return connectionFactory.createConnection();
326
327 }
328
329 private Destination createDestination(final JndiManager jndiManager) throws NamingException {
330 return jndiManager.lookup(configuration.getDestinationName());
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354 public Message createMessage(final Serializable object) throws JMSException {
355 if (object instanceof String) {
356 return this.session.createTextMessage((String) object);
357 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
358 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
359 }
360 return this.session.createObjectMessage(object);
361 }
362
363 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
364 final Message message = createMessage(serializable);
365 message.setJMSTimestamp(event.getTimeMillis());
366 messageProducer.send(message);
367 }
368
369
370
371
372
373
374
375 public MessageConsumer createMessageConsumer() throws JMSException {
376 return this.session.createConsumer(this.destination);
377 }
378
379
380
381
382
383
384
385
386
387
388
389 public MessageProducer createMessageProducer(final Session session, final Destination destination)
390 throws JMSException {
391 return session.createProducer(destination);
392 }
393
394 private Reconnector createReconnector() {
395 final Reconnector recon = new Reconnector(this);
396 recon.setDaemon(true);
397 recon.setPriority(Thread.MIN_PRIORITY);
398 return recon;
399 }
400
401 private Session createSession(final Connection connection) throws JMSException {
402 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
403 }
404
405 public JmsManagerConfiguration getJmsManagerConfiguration() {
406 return configuration;
407 }
408
409 JndiManager getJndiManager() {
410 return configuration.getJndiManager();
411 }
412
413 <T> T lookup(final String destinationName) throws NamingException {
414 return this.jndiManager.lookup(destinationName);
415 }
416
417 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
418 final MapMessage jmsMapMessage) {
419
420 log4jMapMessage.forEach(new BiConsumer<String, Object>() {
421 @Override
422 public void accept(final String key, final Object value) {
423 try {
424 jmsMapMessage.setObject(key, value);
425 } catch (final JMSException e) {
426 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
427 e.getClass(), key, value, e.getLocalizedMessage()), e);
428 }
429 }
430 });
431 return jmsMapMessage;
432 }
433
434 @Override
435 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
436 if (reconnector != null) {
437 reconnector.shutdown();
438 reconnector.interrupt();
439 reconnector = null;
440 }
441 boolean closed = false;
442 closed &= closeJndiManager();
443 closed &= closeMessageProducer();
444 closed &= closeSession();
445 closed &= closeConnection();
446 return closed && this.jndiManager.stop(timeout, timeUnit);
447 }
448
449 void send(final LogEvent event, final Serializable serializable) {
450 if (messageProducer == null) {
451 if (reconnector != null && !configuration.isImmediateFail()) {
452 reconnector.latch();
453 if (messageProducer == null) {
454 throw new AppenderLoggingException(
455 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
456 }
457 }
458 }
459 synchronized (this) {
460 try {
461 createMessageAndSend(event, serializable);
462 } catch (final JMSException causeEx) {
463 if (configuration.isRetry() && reconnector == null) {
464 reconnector = createReconnector();
465 try {
466 closeJndiManager();
467 reconnector.reconnect();
468 } catch (NamingException | JMSException reconnEx) {
469 logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
470 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
471 reconnector.start();
472 throw new AppenderLoggingException(
473 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
474 }
475 try {
476 createMessageAndSend(event, serializable);
477 } catch (final JMSException e) {
478 throw new AppenderLoggingException(
479 String.format("Error sending to %s after reestablishing JMS connection for %s",
480 getName(), configuration),
481 causeEx);
482 }
483 }
484 }
485 }
486 }
487
488 }