1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.net.server;
19
20 import java.io.BufferedReader;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.nio.charset.Charset;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28 import javax.jms.MessageConsumer;
29 import javax.jms.MessageListener;
30 import javax.jms.ObjectMessage;
31
32 import org.apache.logging.log4j.LoggingException;
33 import org.apache.logging.log4j.core.AbstractLifeCycle;
34 import org.apache.logging.log4j.core.LifeCycle2;
35 import org.apache.logging.log4j.core.LogEvent;
36 import org.apache.logging.log4j.core.LogEventListener;
37 import org.apache.logging.log4j.core.appender.mom.JmsManager;
38 import org.apache.logging.log4j.core.net.JndiManager;
39
40
41
42
43
44
45 public class JmsServer extends LogEventListener implements MessageListener, LifeCycle2 {
46
47 private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
48 private final JmsManager jmsManager;
49 private MessageConsumer messageConsumer;
50
51 public JmsServer(final String connectionFactoryBindingName,
52 final String destinationBindingName,
53 final String username,
54 final String password) {
55 final String managerName = JmsServer.class.getName() + '@' + JmsServer.class.hashCode();
56 final JndiManager jndiManager = JndiManager.getDefaultManager(managerName);
57 jmsManager = JmsManager.getJmsManager(managerName, jndiManager, connectionFactoryBindingName,
58 destinationBindingName, username, password);
59 }
60
61 @Override
62 public State getState() {
63 return state.get();
64 }
65
66 @Override
67 public void onMessage(final Message message) {
68 try {
69 if (message instanceof ObjectMessage) {
70 final Object body = ((ObjectMessage) message).getObject();
71 if (body instanceof LogEvent) {
72 log((LogEvent) body);
73 } else {
74 LOGGER.warn("Expected ObjectMessage to contain LogEvent. Got type {} instead.", body.getClass());
75 }
76 } else {
77 LOGGER.warn("Received message of type {} and JMSType {} which cannot be handled.", message.getClass(),
78 message.getJMSType());
79 }
80 } catch (final JMSException e) {
81 LOGGER.catching(e);
82 }
83 }
84
85 @Override
86 public void initialize() {
87 }
88
89 @Override
90 public void start() {
91 if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
92 try {
93 messageConsumer = jmsManager.createMessageConsumer();
94 messageConsumer.setMessageListener(this);
95 } catch (final JMSException e) {
96 throw new LoggingException(e);
97 }
98 }
99 }
100
101 @Override
102 public void stop() {
103 stop(AbstractLifeCycle.DEFAULT_STOP_TIMEOUT, AbstractLifeCycle.DEFAULT_STOP_TIMEUNIT);
104 }
105
106 @Override
107 public boolean stop(final long timeout, final TimeUnit timeUnit) {
108 boolean stopped = true;
109 try {
110 messageConsumer.close();
111 } catch (final JMSException e) {
112 LOGGER.debug("Exception closing {}", messageConsumer, e);
113 stopped = false;
114 }
115 return stopped && jmsManager.stop(timeout, timeUnit);
116 }
117
118 @Override
119 public boolean isStarted() {
120 return state.get() == State.STARTED;
121 }
122
123 @Override
124 public boolean isStopped() {
125 return state.get() == State.STOPPED;
126 }
127
128
129
130
131
132
133
134 public void run() throws IOException {
135 this.start();
136 System.out.println("Type \"exit\" to quit.");
137 final BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in, Charset.defaultCharset()));
138 while (true) {
139 final String line = stdin.readLine();
140 if (line == null || line.equalsIgnoreCase("exit")) {
141 System.out.println("Exiting. Kill the application if it does not exit due to daemon threads.");
142 this.stop();
143 return;
144 }
145 }
146 }
147
148 }