1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.logging;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.log4j.AppenderSkeleton;
40 import org.apache.log4j.Level;
41 import org.apache.log4j.spi.LoggingEvent;
42 import org.apache.mina.core.buffer.IoBuffer;
43 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
44 import org.apache.mina.core.filterchain.IoFilterAdapter;
45 import org.apache.mina.core.future.ConnectFuture;
46 import org.apache.mina.core.service.IoHandlerAdapter;
47 import org.apache.mina.core.session.IdleStatus;
48 import org.apache.mina.core.session.IoSession;
49 import org.apache.mina.filter.codec.ProtocolCodecFactory;
50 import org.apache.mina.filter.codec.ProtocolCodecFilter;
51 import org.apache.mina.filter.codec.ProtocolDecoder;
52 import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
53 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
54 import org.apache.mina.filter.codec.ProtocolEncoder;
55 import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
56 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
57 import org.apache.mina.filter.executor.ExecutorFilter;
58 import org.apache.mina.filter.statistic.ProfilerTimerFilter;
59 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
60 import org.apache.mina.transport.socket.nio.NioSocketConnector;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67
68
69
70
71
72 public class MdcInjectionFilterTest {
73
74 static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
75 private static final int TIMEOUT = 5000;
76
77 private final MyAppender appender = new MyAppender();
78 private int port;
79 private NioSocketAcceptor acceptor;
80
81 private Level previousLevelRootLogger;
82 private ExecutorFilter executorFilter1;
83 private ExecutorFilter executorFilter2;
84
85 @Before
86 public void setUp() throws Exception {
87
88 org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
89 previousLevelRootLogger = org.apache.log4j.Logger.getRootLogger().getLevel();
90 org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
91 org.apache.log4j.Logger.getRootLogger().addAppender(appender);
92 acceptor = new NioSocketAcceptor();
93 }
94
95
96 @After
97 public void tearDown() throws Exception {
98 acceptor.dispose(true);
99 org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
100
101 destroy(executorFilter1);
102 destroy(executorFilter2);
103
104 List<String> after = getThreadNames();
105
106
107 Thread.sleep(50);
108 after = getThreadNames();
109
110 int count = 0;
111
112
113
114
115
116 while (contains(after, "Nio") && count++ < 10) {
117 Thread.sleep(50);
118 after = getThreadNames();
119 System.out.println("** after = " + after);
120 }
121 System.out.println("============================");
122
123 while (contains(after, "pool") && count++ < 10) {
124 Thread.sleep(50);
125 after = getThreadNames();
126 System.out.println("** after = " + after);
127 }
128 System.out.println("============================");
129
130
131
132 appender.clear();
133 }
134
135 private void destroy(ExecutorFilter executorFilter) throws InterruptedException {
136 if (executorFilter != null) {
137 ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
138 executor.shutdown();
139 while (!executor.isTerminated()) {
140
141 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
142 }
143 }
144 }
145
146 @Test
147 public void testSimpleChain() throws IOException, InterruptedException {
148 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
149 chain.addFirst("mdc-injector", new MdcInjectionFilter());
150 chain.addLast("dummy", new DummyIoFilter());
151 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
152 test(chain);
153 }
154
155 @Test
156 public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
157 executorFilter1 = new ExecutorFilter();
158 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
159 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
160 chain.addFirst("mdc-injector1", mdcInjectionFilter);
161 chain.addLast("dummy", new DummyIoFilter());
162 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
163 chain.addLast("executor" , executorFilter1);
164 chain.addLast("mdc-injector2", mdcInjectionFilter);
165 test(chain);
166 }
167
168 @Test
169 public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
170 executorFilter1 = new ExecutorFilter();
171 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
172 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
173 chain.addLast("executor" , executorFilter1);
174 chain.addLast("mdc-injector", mdcInjectionFilter);
175 chain.addLast("dummy", new DummyIoFilter());
176 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
177 test(chain);
178 }
179
180 @Test
181 public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
182 executorFilter1 = new ExecutorFilter();
183 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
184 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
185 chain.addLast("executor" , executorFilter1);
186 chain.addLast("mdc-injector", mdcInjectionFilter);
187 chain.addLast("dummy", new DummyIoFilter());
188 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
189 test(chain);
190 }
191
192 @Test
193 public void testMultipleFilters() throws IOException, InterruptedException {
194 executorFilter1 = new ExecutorFilter();
195 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
196 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
197 chain.addLast("executor" , executorFilter1);
198 chain.addLast("mdc-injector", mdcInjectionFilter);
199 chain.addLast("profiler", new ProfilerTimerFilter());
200 chain.addLast("dummy", new DummyIoFilter());
201 chain.addLast("logger", new LoggingFilter());
202 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
203 test(chain);
204 }
205
206 @Test
207 public void testTwoExecutorFilters() throws IOException, InterruptedException {
208 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
209 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
210 executorFilter1 = new ExecutorFilter();
211 executorFilter2 = new ExecutorFilter();
212 chain.addLast("executorFilter1" , executorFilter1);
213 chain.addLast("mdc-injector1", mdcInjectionFilter);
214 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
215 chain.addLast("dummy", new DummyIoFilter());
216 chain.addLast("executorFilter2" , executorFilter2);
217
218
219 chain.addLast("mdc-injector2", mdcInjectionFilter);
220 test(chain);
221 }
222
223 @Test
224 public void testOnlyRemoteAddress() throws IOException, InterruptedException {
225 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
226 chain.addFirst("mdc-injector", new MdcInjectionFilter(
227 MdcInjectionFilter.MdcKey.remoteAddress));
228 chain.addLast("dummy", new DummyIoFilter());
229 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
230 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
231 acceptor.setHandler(simpleIoHandler);
232 acceptor.bind(new InetSocketAddress(0));
233 port = acceptor.getLocalAddress().getPort();
234 acceptor.setFilterChainBuilder(chain);
235
236 NioSocketConnector connector = new NioSocketConnector();
237 connector.setHandler(new IoHandlerAdapter());
238 connectAndWrite(connector,0);
239 connectAndWrite(connector,1);
240
241 simpleIoHandler.messageSentLatch.await();
242 simpleIoHandler.sessionIdleLatch.await();
243 simpleIoHandler.sessionClosedLatch.await();
244 connector.dispose(true);
245
246
247 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
248
249 for (LoggingEvent event : events) {
250 if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
251 continue;
252 }
253 for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
254 String key = mdcKey.name();
255 Object value = event.getMDC(key);
256 if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
257 assertNotNull(
258 "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
259 } else {
260 assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
261 }
262 }
263 }
264 }
265
266 private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
267
268 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
269 acceptor.setHandler(simpleIoHandler);
270 acceptor.bind(new InetSocketAddress(0));
271 port = acceptor.getLocalAddress().getPort();
272 acceptor.setFilterChainBuilder(chain);
273
274 NioSocketConnector connector = new NioSocketConnector();
275 connector.setHandler(new IoHandlerAdapter());
276 SocketAddress remoteAddressClients[] = new SocketAddress[2];
277 remoteAddressClients[0] = connectAndWrite(connector,0);
278 remoteAddressClients[1] = connectAndWrite(connector,1);
279
280 simpleIoHandler.messageSentLatch.await();
281 simpleIoHandler.sessionIdleLatch.await();
282 simpleIoHandler.sessionClosedLatch.await();
283 connector.dispose(true);
284
285
286 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
287
288 Set<String> loggersToCheck = new HashSet<String>();
289 loggersToCheck.add(MdcInjectionFilterTest.class.getName());
290 loggersToCheck.add(ProtocolCodecFilter.class.getName());
291 loggersToCheck.add(LoggingFilter.class.getName());
292
293
294 for (LoggingEvent event : events) {
295
296 if (loggersToCheck.contains(event.getLoggerName())) {
297 Object remoteAddress = event.getMDC("remoteAddress");
298 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
299 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
300 assertEquals(
301 "every event should have MDC[handlerClass]",
302 SimpleIoHandler.class.getName(),
303 event.getMDC("handlerClass") );
304 }
305 }
306
307 for (int i = 0; i < remoteAddressClients.length; i++) {
308 SocketAddress remoteAddressClient = remoteAddressClients[i];
309 assertEventExists(events, "sessionCreated", remoteAddressClient, null);
310 assertEventExists(events, "sessionOpened", remoteAddressClient, null);
311 assertEventExists(events, "decode", remoteAddressClient, null);
312 assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
313 assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
314 assertEventExists(events, "encode", remoteAddressClient, null);
315 assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
316 assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
317 assertEventExists(events, "messageSent-2", remoteAddressClient, null);
318 assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
319 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
320 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
321 assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
322 }
323 }
324
325 private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
326 ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
327 connectFuture.awaitUninterruptibly(TIMEOUT);
328 IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
329 IoSession session = connectFuture.getSession();
330 session.write(message).awaitUninterruptibly(TIMEOUT);
331 return session.getLocalAddress();
332 }
333
334 private void assertEventExists(List<LoggingEvent> events,
335 String message,
336 SocketAddress address,
337 String user) {
338 InetSocketAddress remoteAddress = (InetSocketAddress) address;
339 for (LoggingEvent event : events) {
340 if (event.getMessage().equals(message) &&
341 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
342 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
343 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
344 if (user == null && event.getMDC("user") == null) {
345 return;
346 }
347 if (user != null && user.equals(event.getMDC("user"))) {
348 return;
349 }
350 return;
351 }
352 }
353 fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
354 }
355
356 private static class SimpleIoHandler extends IoHandlerAdapter {
357 CountDownLatch sessionIdleLatch = new CountDownLatch(2);
358 CountDownLatch sessionClosedLatch = new CountDownLatch(2);
359 CountDownLatch messageSentLatch = new CountDownLatch(2);
360
361
362
363
364 public SimpleIoHandler() {
365 super();
366 }
367
368 @Override
369 public void sessionCreated(IoSession session) throws Exception {
370 LOGGER.info("sessionCreated");
371 session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
372 }
373
374 @Override
375 public void sessionOpened(IoSession session) throws Exception {
376 LOGGER.info("sessionOpened");
377 }
378
379 @Override
380 public void sessionClosed(IoSession session) throws Exception {
381 LOGGER.info("sessionClosed");
382 sessionClosedLatch.countDown();
383 }
384
385 @Override
386 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
387 LOGGER.info("sessionIdle");
388 sessionIdleLatch.countDown();
389 session.close(true);
390 }
391
392 @Override
393 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
394 LOGGER.info("exceptionCaught", cause);
395 }
396
397 @Override
398 public void messageReceived(IoSession session, Object message) throws Exception {
399 LOGGER.info("messageReceived-1");
400
401 String user = "user-" + message;
402 MdcInjectionFilter.setProperty(session, "user", user);
403 LOGGER.info("messageReceived-2");
404 session.getService().broadcast(message);
405 throw new RuntimeException("just a test, forcing exceptionCaught");
406 }
407
408 @Override
409 public void messageSent(IoSession session, Object message) throws Exception {
410 LOGGER.info("messageSent-1");
411 MdcInjectionFilter.removeProperty(session, "user");
412 LOGGER.info("messageSent-2");
413 messageSentLatch.countDown();
414 }
415 }
416
417 private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
418
419
420
421 public DummyProtocolCodecFactory() {
422 super();
423 }
424
425 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
426 return new ProtocolEncoderAdapter() {
427 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
428 LOGGER.info("encode");
429 IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
430 out.write(buffer);
431 }
432 };
433 }
434
435 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
436 return new ProtocolDecoderAdapter() {
437 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
438 if (in.remaining() >= 4) {
439 int value = in.getInt();
440 LOGGER.info("decode");
441 out.write(value);
442 }
443 }
444 };
445 }
446 }
447
448 private static class MyAppender extends AppenderSkeleton {
449 List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
450
451
452
453
454 public MyAppender() {
455 super();
456 }
457
458 public void clear() {
459 events.clear();
460 }
461
462 @Override
463 protected void append(final LoggingEvent loggingEvent) {
464 loggingEvent.getMDCCopy();
465 events.add(loggingEvent);
466 }
467
468 @Override
469 public boolean requiresLayout() {
470 return false;
471 }
472
473 @Override
474 public void close() {
475
476 }
477 }
478
479 static class DummyIoFilter extends IoFilterAdapter {
480 @Override
481 public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
482 LOGGER.info("DummyIoFilter.sessionOpened");
483 nextFilter.sessionOpened(session);
484 }
485 }
486
487
488 private List<String> getThreadNames() {
489 List<String> list = new ArrayList<String>();
490 int active = Thread.activeCount();
491 Thread[] threads = new Thread[active];
492 Thread.enumerate(threads);
493 for (Thread thread : threads) {
494 try {
495 String name = thread.getName();
496 list.add(name);
497 } catch (NullPointerException ignore) {
498 }
499 }
500 return list;
501 }
502
503 private boolean contains(List<String> list, String search) {
504 for (String s : list) {
505 if (s.contains(search)) {
506 return true;
507 }
508 }
509 return false;
510 }
511 }