1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.logging;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29
30 import junit.framework.TestCase;
31
32 import org.apache.log4j.AppenderSkeleton;
33 import org.apache.log4j.Level;
34 import org.apache.log4j.spi.LoggingEvent;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
37 import org.apache.mina.core.filterchain.IoFilterAdapter;
38 import org.apache.mina.core.future.ConnectFuture;
39 import org.apache.mina.core.service.IoHandlerAdapter;
40 import org.apache.mina.core.session.IdleStatus;
41 import org.apache.mina.core.session.IoSession;
42 import org.apache.mina.filter.codec.ProtocolCodecFactory;
43 import org.apache.mina.filter.codec.ProtocolCodecFilter;
44 import org.apache.mina.filter.codec.ProtocolDecoder;
45 import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
46 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
47 import org.apache.mina.filter.codec.ProtocolEncoder;
48 import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
49 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
50 import org.apache.mina.filter.executor.ExecutorFilter;
51 import org.apache.mina.filter.statistic.ProfilerTimerFilter;
52 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53 import org.apache.mina.transport.socket.nio.NioSocketConnector;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63 public class MdcInjectionFilterTest extends TestCase {
64
65 private static Logger logger = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
66 private static final int TIMEOUT = 5000;
67
68 private final MyAppender appender = new MyAppender();
69 private int port;
70 private NioSocketAcceptor acceptor;
71
72 @Override
73 protected void setUp() throws Exception {
74 super.setUp();
75
76 org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
77 org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
78 org.apache.log4j.Logger.getRootLogger().addAppender(appender);
79 acceptor = new NioSocketAcceptor();
80 }
81
82
83 @Override
84 protected void tearDown() throws Exception {
85 acceptor.dispose();
86 super.tearDown();
87 }
88
89 public void testSimpleChain() throws IOException, InterruptedException {
90 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
91 chain.addFirst("mdc-injector", new MdcInjectionFilter());
92 chain.addLast("dummy", new DummyIoFilter());
93 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
94 test(chain);
95 }
96
97 public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
98 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
99 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
100 chain.addFirst("mdc-injector1", mdcInjectionFilter);
101 chain.addLast("dummy", new DummyIoFilter());
102 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
103 chain.addLast("executor" , new ExecutorFilter());
104 chain.addLast("mdc-injector2", mdcInjectionFilter);
105 test(chain);
106 }
107
108 public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
109 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
110 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
111 chain.addLast("executor" , new ExecutorFilter());
112 chain.addLast("mdc-injector", mdcInjectionFilter);
113 chain.addLast("dummy", new DummyIoFilter());
114 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
115 test(chain);
116 }
117
118 public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
119 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
120 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
121 chain.addLast("executor" , new ExecutorFilter());
122 chain.addLast("mdc-injector", mdcInjectionFilter);
123 chain.addLast("dummy", new DummyIoFilter());
124 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
125 test(chain);
126 }
127
128 public void testMultipleFilters() throws IOException, InterruptedException {
129 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
130 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
131 chain.addLast("executor" , new ExecutorFilter());
132 chain.addLast("mdc-injector", mdcInjectionFilter);
133 chain.addLast("profiler", new ProfilerTimerFilter());
134 chain.addLast("dummy", new DummyIoFilter());
135 chain.addLast("logger", new LoggingFilter());
136 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
137 test(chain);
138 }
139
140
141 public void testTwoExecutorFilters() throws IOException, InterruptedException {
142 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
143 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
144 chain.addLast("executor1" , new ExecutorFilter());
145 chain.addLast("mdc-injector1", mdcInjectionFilter);
146 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
147 chain.addLast("dummy", new DummyIoFilter());
148 chain.addLast("executor2" , new ExecutorFilter());
149
150
151 chain.addLast("mdc-injector2", mdcInjectionFilter);
152 test(chain);
153 }
154
155 public void testOnlyRemoteAddress() throws IOException, InterruptedException {
156 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
157 chain.addFirst("mdc-injector", new MdcInjectionFilter(
158 MdcInjectionFilter.MdcKey.remoteAddress));
159 chain.addLast("dummy", new DummyIoFilter());
160 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
161 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
162 acceptor.setHandler(simpleIoHandler);
163 acceptor.bind(new InetSocketAddress(0));
164 port = acceptor.getLocalAddress().getPort();
165 acceptor.setFilterChainBuilder(chain);
166
167 NioSocketConnector connector = new NioSocketConnector();
168 connector.setHandler(new IoHandlerAdapter());
169 connectAndWrite(connector,0);
170 connectAndWrite(connector,1);
171
172 simpleIoHandler.messageSentLatch.await();
173 simpleIoHandler.sessionIdleLatch.await();
174 simpleIoHandler.sessionClosedLatch.await();
175 connector.dispose();
176
177
178 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
179
180 for (LoggingEvent event : events) {
181 for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
182 String key = mdcKey.name();
183 Object value = event.getMDC(key);
184 if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
185 assertNotNull(
186 "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
187 } else {
188 assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
189 }
190 }
191 }
192 }
193
194 private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
195
196 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
197 acceptor.setHandler(simpleIoHandler);
198 acceptor.bind(new InetSocketAddress(0));
199 port = acceptor.getLocalAddress().getPort();
200 acceptor.setFilterChainBuilder(chain);
201
202 NioSocketConnector connector = new NioSocketConnector();
203 connector.setHandler(new IoHandlerAdapter());
204 SocketAddress remoteAddressClients[] = new SocketAddress[2];
205 remoteAddressClients[0] = connectAndWrite(connector,0);
206 remoteAddressClients[1] = connectAndWrite(connector,1);
207
208 simpleIoHandler.messageSentLatch.await();
209 simpleIoHandler.sessionIdleLatch.await();
210 simpleIoHandler.sessionClosedLatch.await();
211 connector.dispose();
212
213
214 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
215
216
217 for (LoggingEvent event : events) {
218 if (!ExecutorFilter.class.getName().equals(event.getLoggerName())) {
219 Object remoteAddress = event.getMDC("remoteAddress");
220 assertNotNull(
221 "MDC[remoteAddress] not set for [" + event.getMessage() + "]",
222 remoteAddress);
223 assertNotNull(
224 "MDC[remotePort] not set for [" + event.getMessage() + "]",
225 event.getMDC("remotePort"));
226 assertEquals(
227 "every event should have MDC[handlerClass]",
228 SimpleIoHandler.class.getName(),
229 event.getMDC("handlerClass") );
230 }
231 }
232
233 for (int i = 0; i < remoteAddressClients.length; i++) {
234 SocketAddress remoteAddressClient = remoteAddressClients[i];
235 assertEventExists(events, "sessionCreated", remoteAddressClient, null);
236 assertEventExists(events, "sessionOpened", remoteAddressClient, null);
237 assertEventExists(events, "decode", remoteAddressClient, null);
238 assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
239 assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
240 assertEventExists(events, "encode", remoteAddressClient, null);
241 assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
242 assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
243 assertEventExists(events, "messageSent-2", remoteAddressClient, null);
244 assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
245 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
246 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
247 assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
248 }
249 }
250
251 private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
252 ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
253 connectFuture.awaitUninterruptibly(TIMEOUT);
254 IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
255 IoSession session = connectFuture.getSession();
256 session.write(message).awaitUninterruptibly(TIMEOUT);
257 return session.getLocalAddress();
258 }
259
260 private void assertEventExists(List<LoggingEvent> events,
261 String message,
262 SocketAddress address,
263 String user) {
264 InetSocketAddress remoteAddress = (InetSocketAddress) address;
265 for (LoggingEvent event : events) {
266 if (event.getMessage().equals(message) &&
267 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
268 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
269 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
270 if (user == null && event.getMDC("user") == null) {
271 return;
272 }
273 if (user != null && user.equals(event.getMDC("user"))) {
274 return;
275 }
276 return;
277 }
278 }
279 fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
280 }
281
282 private static class SimpleIoHandler extends IoHandlerAdapter {
283
284 CountDownLatch sessionIdleLatch = new CountDownLatch(2);
285 CountDownLatch sessionClosedLatch = new CountDownLatch(2);
286 CountDownLatch messageSentLatch = new CountDownLatch(2);
287
288 @Override
289 public void sessionCreated(IoSession session) throws Exception {
290 logger.info("sessionCreated");
291 session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
292 }
293
294 @Override
295 public void sessionOpened(IoSession session) throws Exception {
296 logger.info("sessionOpened");
297 }
298
299 @Override
300 public void sessionClosed(IoSession session) throws Exception {
301 logger.info("sessionClosed");
302 sessionClosedLatch.countDown();
303 }
304
305 @Override
306 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
307 logger.info("sessionIdle");
308 sessionIdleLatch.countDown();
309 session.close();
310 }
311
312 @Override
313 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
314 logger.info("exceptionCaught", cause);
315 }
316
317 @Override
318 public void messageReceived(IoSession session, Object message) throws Exception {
319 logger.info("messageReceived-1");
320
321 String user = "user-" + message;
322 MdcInjectionFilter.setProperty(session, "user", user);
323 logger.info("messageReceived-2");
324 session.getService().broadcast(message);
325 throw new RuntimeException("just a test, forcing exceptionCaught");
326 }
327
328 @Override
329 public void messageSent(IoSession session, Object message) throws Exception {
330 logger.info("messageSent-1");
331 MdcInjectionFilter.removeProperty(session, "user");
332 logger.info("messageSent-2");
333 messageSentLatch.countDown();
334 }
335 }
336
337 private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
338
339 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
340 return new ProtocolEncoderAdapter() {
341 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
342 logger.info("encode");
343 IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
344 out.write(buffer);
345 }
346 };
347 }
348
349 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
350 return new ProtocolDecoderAdapter() {
351 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
352 if (in.remaining() >= 4) {
353 int value = in.getInt();
354 logger.info("decode");
355 out.write(value);
356 }
357 }
358 };
359 }
360 }
361
362 private static class MyAppender extends AppenderSkeleton {
363
364 List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
365
366 @Override
367 protected void append(final LoggingEvent loggingEvent) {
368 loggingEvent.getMDCCopy();
369 events.add(loggingEvent);
370 }
371
372 @Override
373 public boolean requiresLayout() {
374 return false;
375 }
376
377 @Override
378 public void close() {
379 }
380 }
381
382 private static class DummyIoFilter extends IoFilterAdapter {
383 @Override
384 public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
385 logger.info("DummyIoFilter.sessionOpened");
386 nextFilter.sessionOpened(session);
387 }
388 }
389
390 }