1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.net.server;
19
20 import java.io.BufferedReader;
21 import java.io.IOException;
22 import java.io.InputStreamReader;
23 import java.nio.charset.Charset;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageListener;
29 import javax.jms.ObjectMessage;
30
31 import org.apache.logging.log4j.LoggingException;
32 import org.apache.logging.log4j.core.LifeCycle;
33 import org.apache.logging.log4j.core.LogEvent;
34 import org.apache.logging.log4j.core.LogEventListener;
35 import org.apache.logging.log4j.core.appender.mom.JmsManager;
36 import org.apache.logging.log4j.core.net.JndiManager;
37
38
39
40
41
42
43 public class JmsServer extends LogEventListener implements MessageListener, LifeCycle {
44
45 private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
46 private final JmsManager jmsManager;
47 private MessageConsumer messageConsumer;
48
49 public JmsServer(final String connectionFactoryBindingName,
50 final String destinationBindingName,
51 final String username,
52 final String password) {
53 final String managerName = JmsServer.class.getName() + '@' + JmsServer.class.hashCode();
54 final JndiManager jndiManager = JndiManager.getDefaultManager(managerName);
55 jmsManager = JmsManager.getJmsManager(managerName, jndiManager, connectionFactoryBindingName,
56 destinationBindingName, username, password);
57 }
58
59 @Override
60 public State getState() {
61 return state.get();
62 }
63
64 @Override
65 public void onMessage(final Message message) {
66 try {
67 if (message instanceof ObjectMessage) {
68 final Object body = ((ObjectMessage) message).getObject();
69 if (body instanceof LogEvent) {
70 log((LogEvent) body);
71 } else {
72 LOGGER.warn("Expected ObjectMessage to contain LogEvent. Got type {} instead.", body.getClass());
73 }
74 } else {
75 LOGGER.warn("Received message of type {} and JMSType {} which cannot be handled.", message.getClass(),
76 message.getJMSType());
77 }
78 } catch (final JMSException e) {
79 LOGGER.catching(e);
80 }
81 }
82
83 @Override
84 public void initialize() {
85 }
86
87 @Override
88 public void start() {
89 if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
90 try {
91 messageConsumer = jmsManager.createMessageConsumer();
92 messageConsumer.setMessageListener(this);
93 } catch (final JMSException e) {
94 throw new LoggingException(e);
95 }
96 }
97 }
98
99 @Override
100 public void stop() {
101 try {
102 messageConsumer.close();
103 } catch (final JMSException ignored) {
104 }
105 jmsManager.release();
106 }
107
108 @Override
109 public boolean isStarted() {
110 return state.get() == State.STARTED;
111 }
112
113 @Override
114 public boolean isStopped() {
115 return state.get() == State.STOPPED;
116 }
117
118
119
120
121
122
123
124 public void run() throws IOException {
125 this.start();
126 System.out.println("Type \"exit\" to quit.");
127 final BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in, Charset.defaultCharset()));
128 while (true) {
129 final String line = stdin.readLine();
130 if (line == null || line.equalsIgnoreCase("exit")) {
131 System.out.println("Exiting. Kill the application if it does not exit due to daemon threads.");
132 this.stop();
133 return;
134 }
135 }
136 }
137
138 }