1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom;
19
20 import java.io.Serializable;
21 import java.util.Properties;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.MapMessage;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.naming.NamingException;
35
36 import org.apache.logging.log4j.Logger;
37 import org.apache.logging.log4j.core.LogEvent;
38 import org.apache.logging.log4j.core.appender.AbstractManager;
39 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
40 import org.apache.logging.log4j.core.appender.ManagerFactory;
41 import org.apache.logging.log4j.core.net.JndiManager;
42 import org.apache.logging.log4j.core.util.Log4jThread;
43 import org.apache.logging.log4j.status.StatusLogger;
44 import org.apache.logging.log4j.util.BiConsumer;
45
46
47
48
49
50
51
52
53
54 public class JmsManager extends AbstractManager {
55
56 public static class JmsManagerConfiguration {
57 private final Properties jndiProperties;
58 private final String connectionFactoryName;
59 private final String destinationName;
60 private final String userName;
61 private final char[] password;
62 private final boolean immediateFail;
63 private final boolean retry;
64 private final long reconnectIntervalMillis;
65
66 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
67 final String destinationName, final String userName, final char[] password, final boolean immediateFail,
68 final long reconnectIntervalMillis) {
69 this.jndiProperties = jndiProperties;
70 this.connectionFactoryName = connectionFactoryName;
71 this.destinationName = destinationName;
72 this.userName = userName;
73 this.password = password;
74 this.immediateFail = immediateFail;
75 this.reconnectIntervalMillis = reconnectIntervalMillis;
76 this.retry = reconnectIntervalMillis > 0;
77 }
78
79 public String getConnectionFactoryName() {
80 return connectionFactoryName;
81 }
82
83 public String getDestinationName() {
84 return destinationName;
85 }
86
87 public JndiManager getJndiManager() {
88 return JndiManager.getJndiManager(getJndiProperties());
89 }
90
91 public Properties getJndiProperties() {
92 return jndiProperties;
93 }
94
95 public char[] getPassword() {
96 return password;
97 }
98
99 public long getReconnectIntervalMillis() {
100 return reconnectIntervalMillis;
101 }
102
103 public String getUserName() {
104 return userName;
105 }
106
107 public boolean isImmediateFail() {
108 return immediateFail;
109 }
110
111 public boolean isRetry() {
112 return retry;
113 }
114
115 @Override
116 public String toString() {
117 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
118 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
119 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
120 + reconnectIntervalMillis + "]";
121 }
122
123 }
124
125 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
126
127 @Override
128 public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
129 try {
130 return new JmsManager(name, data);
131 } catch (final Exception e) {
132 LOGGER.error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
133 return null;
134 }
135 }
136 }
137
138
139
140
141 private class Reconnector extends Log4jThread {
142
143 private final CountDownLatch latch = new CountDownLatch(1);
144
145 private volatile boolean shutdown = false;
146
147 private final Object owner;
148
149 public Reconnector(final Object owner) {
150 super("JmsManager-Reconnector");
151 this.owner = owner;
152 }
153
154 public void latch() {
155 try {
156 latch.await();
157 } catch (final InterruptedException ex) {
158
159 }
160 }
161
162 void reconnect() throws NamingException, JMSException {
163 final JndiManager jndiManager2 = getJndiManager();
164 final Connection connection2 = createConnection(jndiManager2);
165 final Session session2 = createSession(connection2);
166 final Destination destination2 = createDestination(jndiManager2);
167 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
168 connection2.start();
169 synchronized (owner) {
170 jndiManager = jndiManager2;
171 connection = connection2;
172 session = session2;
173 destination = destination2;
174 messageProducer = messageProducer2;
175 reconnector = null;
176 shutdown = true;
177 }
178 LOGGER.debug("Connection reestablished to {}", configuration);
179 }
180
181 @Override
182 public void run() {
183 while (!shutdown) {
184 try {
185 sleep(configuration.getReconnectIntervalMillis());
186 reconnect();
187 } catch (final InterruptedException | JMSException | NamingException e) {
188 LOGGER.debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
189 e);
190 } finally {
191 latch.countDown();
192 }
193 }
194 }
195
196 public void shutdown() {
197 shutdown = true;
198 }
199
200 }
201
202 private static final Logger LOGGER = StatusLogger.getLogger();
203
204 static final JmsManagerFactory FACTORY = new JmsManagerFactory();
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
229 final String connectionFactoryName, final String destinationName, final String userName,
230 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
231 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
232 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
233 return getManager(name, FACTORY, configuration);
234 }
235
236 private final JmsManagerConfiguration configuration;
237
238 private volatile Reconnector reconnector;
239 private volatile JndiManager jndiManager;
240 private volatile Connection connection;
241 private volatile Session session;
242 private volatile Destination destination;
243 private volatile MessageProducer messageProducer;
244
245 private JmsManager(final String name, final JmsManagerConfiguration configuration) {
246 super(null, name);
247 this.configuration = configuration;
248 this.jndiManager = configuration.getJndiManager();
249 try {
250 this.connection = createConnection(this.jndiManager);
251 this.session = createSession(this.connection);
252 this.destination = createDestination(this.jndiManager);
253 this.messageProducer = createMessageProducer(this.session, this.destination);
254 this.connection.start();
255 } catch (NamingException | JMSException e) {
256 this.reconnector = createReconnector();
257 this.reconnector.start();
258 }
259 }
260
261 private boolean closeConnection() {
262 if (connection == null) {
263 return true;
264 }
265 final Connection temp = connection;
266 connection = null;
267 try {
268 temp.close();
269 return true;
270 } catch (final JMSException e) {
271 StatusLogger.getLogger().debug(
272 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
273 e.getLocalizedMessage(), temp, e);
274 return false;
275 }
276 }
277
278 private boolean closeJndiManager() {
279 if (jndiManager == null) {
280 return true;
281 }
282 final JndiManager tmp = jndiManager;
283 jndiManager = null;
284 tmp.close();
285 return true;
286 }
287
288 private boolean closeMessageProducer() {
289 if (messageProducer == null) {
290 return true;
291 }
292 final MessageProducer temp = messageProducer;
293 messageProducer = null;
294 try {
295 temp.close();
296 return true;
297 } catch (final JMSException e) {
298 StatusLogger.getLogger().debug(
299 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
300 e.getLocalizedMessage(), temp, e);
301 return false;
302 }
303 }
304
305 private boolean closeSession() {
306 if (session == null) {
307 return true;
308 }
309 final Session temp = session;
310 session = null;
311 try {
312 temp.close();
313 return true;
314 } catch (final JMSException e) {
315 StatusLogger.getLogger().debug(
316 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
317 e.getLocalizedMessage(), temp, e);
318 return false;
319 }
320 }
321
322 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
323 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
324 if (configuration.getUserName() != null && configuration.getPassword() != null) {
325 return connectionFactory.createConnection(configuration.getUserName(),
326 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
327 }
328 return connectionFactory.createConnection();
329
330 }
331
332 private Destination createDestination(final JndiManager jndiManager) throws NamingException {
333 return jndiManager.lookup(configuration.getDestinationName());
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357 public Message createMessage(final Serializable object) throws JMSException {
358 if (object instanceof String) {
359 return this.session.createTextMessage((String) object);
360 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
361 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
362 }
363 return this.session.createObjectMessage(object);
364 }
365
366 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
367 final Message message = createMessage(serializable);
368 message.setJMSTimestamp(event.getTimeMillis());
369 messageProducer.send(message);
370 }
371
372
373
374
375
376
377
378 public MessageConsumer createMessageConsumer() throws JMSException {
379 return this.session.createConsumer(this.destination);
380 }
381
382
383
384
385
386
387
388
389
390
391
392 public MessageProducer createMessageProducer(final Session session, final Destination destination)
393 throws JMSException {
394 return session.createProducer(destination);
395 }
396
397 private Reconnector createReconnector() {
398 final Reconnector recon = new Reconnector(this);
399 recon.setDaemon(true);
400 recon.setPriority(Thread.MIN_PRIORITY);
401 return recon;
402 }
403
404 private Session createSession(final Connection connection) throws JMSException {
405 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
406 }
407
408 public JmsManagerConfiguration getJmsManagerConfiguration() {
409 return configuration;
410 }
411
412 JndiManager getJndiManager() {
413 return configuration.getJndiManager();
414 }
415
416 <T> T lookup(final String destinationName) throws NamingException {
417 return this.jndiManager.lookup(destinationName);
418 }
419
420 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
421 final MapMessage jmsMapMessage) {
422
423 log4jMapMessage.forEach(new BiConsumer<String, Object>() {
424 @Override
425 public void accept(final String key, final Object value) {
426 try {
427 jmsMapMessage.setObject(key, value);
428 } catch (final JMSException e) {
429 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
430 e.getClass(), key, value, e.getLocalizedMessage()), e);
431 }
432 }
433 });
434 return jmsMapMessage;
435 }
436
437 @Override
438 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
439 if (reconnector != null) {
440 reconnector.shutdown();
441 reconnector.interrupt();
442 reconnector = null;
443 }
444 boolean closed = false;
445 closed &= closeJndiManager();
446 closed &= closeMessageProducer();
447 closed &= closeSession();
448 closed &= closeConnection();
449 return closed && this.jndiManager.stop(timeout, timeUnit);
450 }
451
452 void send(final LogEvent event, final Serializable serializable) {
453 if (messageProducer == null) {
454 if (reconnector != null && !configuration.isImmediateFail()) {
455 reconnector.latch();
456 }
457 if (messageProducer == null) {
458 throw new AppenderLoggingException(
459 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
460 }
461 }
462 synchronized (this) {
463 try {
464 createMessageAndSend(event, serializable);
465 } catch (final JMSException causeEx) {
466 if (configuration.isRetry() && reconnector == null) {
467 reconnector = createReconnector();
468 try {
469 closeJndiManager();
470 reconnector.reconnect();
471 } catch (NamingException | JMSException reconnEx) {
472 LOGGER.debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
473 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
474 reconnector.start();
475 throw new AppenderLoggingException(
476 String.format("Error sending to %s for %s", getName(), configuration), causeEx);
477 }
478 try {
479 createMessageAndSend(event, serializable);
480 } catch (final JMSException e) {
481 throw new AppenderLoggingException(
482 String.format("Error sending to %s after reestablishing connection for %s", getName(),
483 configuration),
484 causeEx);
485 }
486 }
487 }
488 }
489 }
490
491 }