1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.logging;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.log4j.AppenderSkeleton;
40 import org.apache.log4j.Level;
41 import org.apache.log4j.spi.LoggingEvent;
42 import org.apache.mina.core.buffer.IoBuffer;
43 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
44 import org.apache.mina.core.filterchain.IoFilterAdapter;
45 import org.apache.mina.core.future.ConnectFuture;
46 import org.apache.mina.core.service.IoHandlerAdapter;
47 import org.apache.mina.core.session.IdleStatus;
48 import org.apache.mina.core.session.IoSession;
49 import org.apache.mina.filter.codec.ProtocolCodecFactory;
50 import org.apache.mina.filter.codec.ProtocolCodecFilter;
51 import org.apache.mina.filter.codec.ProtocolDecoder;
52 import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
53 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
54 import org.apache.mina.filter.codec.ProtocolEncoder;
55 import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
56 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
57 import org.apache.mina.filter.executor.ExecutorFilter;
58 import org.apache.mina.filter.statistic.ProfilerTimerFilter;
59 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
60 import org.apache.mina.transport.socket.nio.NioSocketConnector;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67
68
69
70
71
72 public class MdcInjectionFilterTest {
73
74 static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
75
76 private static final int TIMEOUT = 5000;
77
78 private final MyAppender appender = new MyAppender();
79
80 private int port;
81
82 private NioSocketAcceptor acceptor;
83
84 private Level previousLevelRootLogger;
85
86 private ExecutorFilter executorFilter1;
87
88 private ExecutorFilter executorFilter2;
89
90 @Before
91 public void setUp() throws Exception {
92
93 org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
94 previousLevelRootLogger = org.apache.log4j.Logger.getRootLogger().getLevel();
95 org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
96 org.apache.log4j.Logger.getRootLogger().addAppender(appender);
97 acceptor = new NioSocketAcceptor();
98 }
99
100 @After
101 public void tearDown() throws Exception {
102 acceptor.dispose(true);
103 org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
104
105 destroy(executorFilter1);
106 destroy(executorFilter2);
107
108 List<String> after = getThreadNames();
109
110
111 Thread.sleep(50);
112 after = getThreadNames();
113
114 int count = 0;
115
116
117
118
119
120 while (contains(after, "Nio") && count++ < 10) {
121 Thread.sleep(50);
122 after = getThreadNames();
123 }
124
125 while (contains(after, "pool") && count++ < 10) {
126 Thread.sleep(50);
127 after = getThreadNames();
128 }
129
130
131
132 appender.clear();
133 }
134
135 private void destroy(ExecutorFilter executorFilter) throws InterruptedException {
136 if (executorFilter != null) {
137 ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
138 executor.shutdown();
139 while (!executor.isTerminated()) {
140
141 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
142 }
143 }
144 }
145
146 @Test
147 public void testSimpleChain() throws IOException, InterruptedException {
148 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
149 chain.addFirst("mdc-injector", new MdcInjectionFilter());
150 chain.addLast("dummy", new DummyIoFilter());
151 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
152 test(chain);
153 }
154
155 @Test
156 public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
157 executorFilter1 = new ExecutorFilter();
158 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
159 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
160 chain.addFirst("mdc-injector1", mdcInjectionFilter);
161 chain.addLast("dummy", new DummyIoFilter());
162 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
163 chain.addLast("executor", executorFilter1);
164 chain.addLast("mdc-injector2", mdcInjectionFilter);
165 test(chain);
166 }
167
168 @Test
169 public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
170 executorFilter1 = new ExecutorFilter();
171 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
172 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
173 chain.addLast("executor", executorFilter1);
174 chain.addLast("mdc-injector", mdcInjectionFilter);
175 chain.addLast("dummy", new DummyIoFilter());
176 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
177 test(chain);
178 }
179
180 @Test
181 public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
182 executorFilter1 = new ExecutorFilter();
183 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
184 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
185 chain.addLast("executor", executorFilter1);
186 chain.addLast("mdc-injector", mdcInjectionFilter);
187 chain.addLast("dummy", new DummyIoFilter());
188 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
189 test(chain);
190 }
191
192 @Test
193 public void testMultipleFilters() throws IOException, InterruptedException {
194 executorFilter1 = new ExecutorFilter();
195 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
196 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
197 chain.addLast("executor", executorFilter1);
198 chain.addLast("mdc-injector", mdcInjectionFilter);
199 chain.addLast("profiler", new ProfilerTimerFilter());
200 chain.addLast("dummy", new DummyIoFilter());
201 chain.addLast("logger", new LoggingFilter());
202 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
203 test(chain);
204 }
205
206 @Test
207 public void testTwoExecutorFilters() throws IOException, InterruptedException {
208 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
209 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
210 executorFilter1 = new ExecutorFilter();
211 executorFilter2 = new ExecutorFilter();
212 chain.addLast("executorFilter1", executorFilter1);
213 chain.addLast("mdc-injector1", mdcInjectionFilter);
214 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
215 chain.addLast("dummy", new DummyIoFilter());
216 chain.addLast("executorFilter2", executorFilter2);
217
218
219 chain.addLast("mdc-injector2", mdcInjectionFilter);
220 test(chain);
221 }
222
223 @Test
224 public void testOnlyRemoteAddress() throws IOException, InterruptedException {
225 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
226 chain.addFirst("mdc-injector", new MdcInjectionFilter(MdcInjectionFilter.MdcKey.remoteAddress));
227 chain.addLast("dummy", new DummyIoFilter());
228 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
229 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
230 acceptor.setHandler(simpleIoHandler);
231 acceptor.bind(new InetSocketAddress(0));
232 port = acceptor.getLocalAddress().getPort();
233 acceptor.setFilterChainBuilder(chain);
234
235 NioSocketConnector connector = new NioSocketConnector();
236 connector.setHandler(new IoHandlerAdapter());
237 connectAndWrite(connector, 0);
238 connectAndWrite(connector, 1);
239
240 simpleIoHandler.messageSentLatch.await();
241 simpleIoHandler.sessionIdleLatch.await();
242 simpleIoHandler.sessionClosedLatch.await();
243 connector.dispose(true);
244
245
246 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
247
248 for (LoggingEvent event : events) {
249 if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
250 continue;
251 }
252 for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
253 String key = mdcKey.name();
254 Object value = event.getMDC(key);
255 if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
256 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
257 } else {
258 assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
259 }
260 }
261 }
262 }
263
264 private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
265
266 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
267 acceptor.setHandler(simpleIoHandler);
268 acceptor.bind(new InetSocketAddress(0));
269 port = acceptor.getLocalAddress().getPort();
270 acceptor.setFilterChainBuilder(chain);
271
272 NioSocketConnector connector = new NioSocketConnector();
273 connector.setHandler(new IoHandlerAdapter());
274 SocketAddress remoteAddressClients[] = new SocketAddress[2];
275 remoteAddressClients[0] = connectAndWrite(connector, 0);
276 remoteAddressClients[1] = connectAndWrite(connector, 1);
277
278 simpleIoHandler.messageSentLatch.await();
279 simpleIoHandler.sessionIdleLatch.await();
280 simpleIoHandler.sessionClosedLatch.await();
281 connector.dispose(true);
282
283
284 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
285
286 Set<String> loggersToCheck = new HashSet<String>();
287 loggersToCheck.add(MdcInjectionFilterTest.class.getName());
288 loggersToCheck.add(ProtocolCodecFilter.class.getName());
289 loggersToCheck.add(LoggingFilter.class.getName());
290
291
292 for (LoggingEvent event : events) {
293
294 if (loggersToCheck.contains(event.getLoggerName())) {
295 Object remoteAddress = event.getMDC("remoteAddress");
296 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
297 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
298 assertEquals("every event should have MDC[handlerClass]", SimpleIoHandler.class.getName(),
299 event.getMDC("handlerClass"));
300 }
301 }
302
303 for (int i = 0; i < remoteAddressClients.length; i++) {
304 SocketAddress remoteAddressClient = remoteAddressClients[i];
305 assertEventExists(events, "sessionCreated", remoteAddressClient, null);
306 assertEventExists(events, "sessionOpened", remoteAddressClient, null);
307 assertEventExists(events, "decode", remoteAddressClient, null);
308 assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
309 assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
310 assertEventExists(events, "encode", remoteAddressClient, null);
311 assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
312 assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
313 assertEventExists(events, "messageSent-2", remoteAddressClient, null);
314 assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
315 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
316 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
317 assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
318 }
319 }
320
321 private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
322 ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
323 connectFuture.awaitUninterruptibly(TIMEOUT);
324 IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
325 IoSession session = connectFuture.getSession();
326 session.write(message).awaitUninterruptibly(TIMEOUT);
327 return session.getLocalAddress();
328 }
329
330 private void assertEventExists(List<LoggingEvent> events, String message, SocketAddress address, String user) {
331 InetSocketAddress remoteAddress = (InetSocketAddress) address;
332 for (LoggingEvent event : events) {
333 if (event.getMessage().equals(message) && event.getMDC("remoteAddress").equals(remoteAddress.toString())
334 && event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress())
335 && event.getMDC("remotePort").equals(remoteAddress.getPort() + "")) {
336 if (user == null && event.getMDC("user") == null) {
337 return;
338 }
339 if (user != null && user.equals(event.getMDC("user"))) {
340 return;
341 }
342 return;
343 }
344 }
345 fail("No LoggingEvent found from [" + remoteAddress + "] with message [" + message + "]");
346 }
347
348 private static class SimpleIoHandler extends IoHandlerAdapter {
349 CountDownLatch sessionIdleLatch = new CountDownLatch(2);
350
351 CountDownLatch sessionClosedLatch = new CountDownLatch(2);
352
353 CountDownLatch messageSentLatch = new CountDownLatch(2);
354
355
356
357
358 public SimpleIoHandler() {
359 super();
360 }
361
362 @Override
363 public void sessionCreated(IoSession session) throws Exception {
364 LOGGER.info("sessionCreated");
365 session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
366 }
367
368 @Override
369 public void sessionOpened(IoSession session) throws Exception {
370 LOGGER.info("sessionOpened");
371 }
372
373 @Override
374 public void sessionClosed(IoSession session) throws Exception {
375 LOGGER.info("sessionClosed");
376 sessionClosedLatch.countDown();
377 }
378
379 @Override
380 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
381 LOGGER.info("sessionIdle");
382 sessionIdleLatch.countDown();
383 session.closeNow();
384 }
385
386 @Override
387 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
388 LOGGER.info("exceptionCaught", cause);
389 }
390
391 @Override
392 public void messageReceived(IoSession session, Object message) throws Exception {
393 LOGGER.info("messageReceived-1");
394
395 String user = "user-" + message;
396 MdcInjectionFilter.setProperty(session, "user", user);
397 LOGGER.info("messageReceived-2");
398 session.getService().broadcast(message);
399 throw new RuntimeException("just a test, forcing exceptionCaught");
400 }
401
402 @Override
403 public void messageSent(IoSession session, Object message) throws Exception {
404 LOGGER.info("messageSent-1");
405 MdcInjectionFilter.removeProperty(session, "user");
406 LOGGER.info("messageSent-2");
407 messageSentLatch.countDown();
408 }
409 }
410
411 private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
412
413
414
415 public DummyProtocolCodecFactory() {
416 super();
417 }
418
419 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
420 return new ProtocolEncoderAdapter() {
421 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
422 LOGGER.info("encode");
423 IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
424 out.write(buffer);
425 }
426 };
427 }
428
429 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
430 return new ProtocolDecoderAdapter() {
431 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
432 if (in.remaining() >= 4) {
433 int value = in.getInt();
434 LOGGER.info("decode");
435 out.write(value);
436 }
437 }
438 };
439 }
440 }
441
442 private static class MyAppender extends AppenderSkeleton {
443 List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
444
445
446
447
448 public MyAppender() {
449 super();
450 }
451
452 public void clear() {
453 events.clear();
454 }
455
456 @Override
457 protected void append(final LoggingEvent loggingEvent) {
458 loggingEvent.getMDCCopy();
459 events.add(loggingEvent);
460 }
461
462 public boolean requiresLayout() {
463 return false;
464 }
465
466 public void close() {
467
468 }
469 }
470
471 static class DummyIoFilter extends IoFilterAdapter {
472 @Override
473 public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
474 LOGGER.info("DummyIoFilter.sessionOpened");
475 nextFilter.sessionOpened(session);
476 }
477 }
478
479 private List<String> getThreadNames() {
480 List<String> list = new ArrayList<String>();
481 int active = Thread.activeCount();
482 Thread[] threads = new Thread[active];
483 Thread.enumerate(threads);
484 for (Thread thread : threads) {
485 try {
486 String name = thread.getName();
487 list.add(name);
488 } catch (NullPointerException ignore) {
489 }
490 }
491 return list;
492 }
493
494 private boolean contains(List<String> list, String search) {
495 for (String s : list) {
496 if (s.contains(search)) {
497 return true;
498 }
499 }
500 return false;
501 }
502 }