1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.logging;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.log4j.AppenderSkeleton;
40 import org.apache.log4j.Level;
41 import org.apache.log4j.spi.LoggingEvent;
42 import org.apache.mina.core.buffer.IoBuffer;
43 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
44 import org.apache.mina.core.filterchain.IoFilterAdapter;
45 import org.apache.mina.core.future.ConnectFuture;
46 import org.apache.mina.core.service.IoHandlerAdapter;
47 import org.apache.mina.core.session.IdleStatus;
48 import org.apache.mina.core.session.IoSession;
49 import org.apache.mina.filter.codec.ProtocolCodecFactory;
50 import org.apache.mina.filter.codec.ProtocolCodecFilter;
51 import org.apache.mina.filter.codec.ProtocolDecoder;
52 import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
53 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
54 import org.apache.mina.filter.codec.ProtocolEncoder;
55 import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
56 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
57 import org.apache.mina.filter.executor.ExecutorFilter;
58 import org.apache.mina.filter.statistic.ProfilerTimerFilter;
59 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
60 import org.apache.mina.transport.socket.nio.NioSocketConnector;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67
68
69
70
71
72 public class MdcInjectionFilterTest {
73
74 static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
75 private static final int TIMEOUT = 5000;
76
77 private final MyAppender appender = new MyAppender();
78 private int port;
79 private NioSocketAcceptor acceptor;
80
81 private Level previousLevelRootLogger;
82 private ExecutorFilter executorFilter1;
83 private ExecutorFilter executorFilter2;
84
85 @Before
86 public void setUp() throws Exception {
87
88 org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
89 previousLevelRootLogger = org.apache.log4j.Logger.getRootLogger().getLevel();
90 org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
91 org.apache.log4j.Logger.getRootLogger().addAppender(appender);
92 acceptor = new NioSocketAcceptor();
93 }
94
95
96 @After
97 public void tearDown() throws Exception {
98 acceptor.dispose(true);
99 org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
100
101 destroy(executorFilter1);
102 destroy(executorFilter2);
103
104 List<String> after = getThreadNames();
105
106
107 Thread.sleep(50);
108 after = getThreadNames();
109
110 int count = 0;
111
112
113
114
115
116 while (contains(after, "Nio") && count++ < 10) {
117 Thread.sleep(50);
118 after = getThreadNames();
119 }
120
121 while (contains(after, "pool") && count++ < 10) {
122 Thread.sleep(50);
123 after = getThreadNames();
124 }
125
126
127
128 appender.clear();
129 }
130
131 private void destroy(ExecutorFilter executorFilter) throws InterruptedException {
132 if (executorFilter != null) {
133 ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
134 executor.shutdown();
135 while (!executor.isTerminated()) {
136
137 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
138 }
139 }
140 }
141
142 @Test
143 public void testSimpleChain() throws IOException, InterruptedException {
144 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
145 chain.addFirst("mdc-injector", new MdcInjectionFilter());
146 chain.addLast("dummy", new DummyIoFilter());
147 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
148 test(chain);
149 }
150
151 @Test
152 public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
153 executorFilter1 = new ExecutorFilter();
154 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
155 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
156 chain.addFirst("mdc-injector1", mdcInjectionFilter);
157 chain.addLast("dummy", new DummyIoFilter());
158 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
159 chain.addLast("executor" , executorFilter1);
160 chain.addLast("mdc-injector2", mdcInjectionFilter);
161 test(chain);
162 }
163
164 @Test
165 public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
166 executorFilter1 = new ExecutorFilter();
167 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
168 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
169 chain.addLast("executor" , executorFilter1);
170 chain.addLast("mdc-injector", mdcInjectionFilter);
171 chain.addLast("dummy", new DummyIoFilter());
172 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
173 test(chain);
174 }
175
176 @Test
177 public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
178 executorFilter1 = new ExecutorFilter();
179 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
180 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
181 chain.addLast("executor" , executorFilter1);
182 chain.addLast("mdc-injector", mdcInjectionFilter);
183 chain.addLast("dummy", new DummyIoFilter());
184 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
185 test(chain);
186 }
187
188 @Test
189 public void testMultipleFilters() throws IOException, InterruptedException {
190 executorFilter1 = new ExecutorFilter();
191 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
192 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
193 chain.addLast("executor" , executorFilter1);
194 chain.addLast("mdc-injector", mdcInjectionFilter);
195 chain.addLast("profiler", new ProfilerTimerFilter());
196 chain.addLast("dummy", new DummyIoFilter());
197 chain.addLast("logger", new LoggingFilter());
198 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
199 test(chain);
200 }
201
202 @Test
203 public void testTwoExecutorFilters() throws IOException, InterruptedException {
204 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
205 MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
206 executorFilter1 = new ExecutorFilter();
207 executorFilter2 = new ExecutorFilter();
208 chain.addLast("executorFilter1" , executorFilter1);
209 chain.addLast("mdc-injector1", mdcInjectionFilter);
210 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
211 chain.addLast("dummy", new DummyIoFilter());
212 chain.addLast("executorFilter2" , executorFilter2);
213
214
215 chain.addLast("mdc-injector2", mdcInjectionFilter);
216 test(chain);
217 }
218
219 @Test
220 public void testOnlyRemoteAddress() throws IOException, InterruptedException {
221 DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
222 chain.addFirst("mdc-injector", new MdcInjectionFilter(
223 MdcInjectionFilter.MdcKey.remoteAddress));
224 chain.addLast("dummy", new DummyIoFilter());
225 chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
226 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
227 acceptor.setHandler(simpleIoHandler);
228 acceptor.bind(new InetSocketAddress(0));
229 port = acceptor.getLocalAddress().getPort();
230 acceptor.setFilterChainBuilder(chain);
231
232 NioSocketConnector connector = new NioSocketConnector();
233 connector.setHandler(new IoHandlerAdapter());
234 connectAndWrite(connector,0);
235 connectAndWrite(connector,1);
236
237 simpleIoHandler.messageSentLatch.await();
238 simpleIoHandler.sessionIdleLatch.await();
239 simpleIoHandler.sessionClosedLatch.await();
240 connector.dispose(true);
241
242
243 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
244
245 for (LoggingEvent event : events) {
246 if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
247 continue;
248 }
249 for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
250 String key = mdcKey.name();
251 Object value = event.getMDC(key);
252 if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
253 assertNotNull(
254 "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
255 } else {
256 assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
257 }
258 }
259 }
260 }
261
262 private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
263
264 SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
265 acceptor.setHandler(simpleIoHandler);
266 acceptor.bind(new InetSocketAddress(0));
267 port = acceptor.getLocalAddress().getPort();
268 acceptor.setFilterChainBuilder(chain);
269
270 NioSocketConnector connector = new NioSocketConnector();
271 connector.setHandler(new IoHandlerAdapter());
272 SocketAddress remoteAddressClients[] = new SocketAddress[2];
273 remoteAddressClients[0] = connectAndWrite(connector,0);
274 remoteAddressClients[1] = connectAndWrite(connector,1);
275
276 simpleIoHandler.messageSentLatch.await();
277 simpleIoHandler.sessionIdleLatch.await();
278 simpleIoHandler.sessionClosedLatch.await();
279 connector.dispose(true);
280
281
282 List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
283
284 Set<String> loggersToCheck = new HashSet<String>();
285 loggersToCheck.add(MdcInjectionFilterTest.class.getName());
286 loggersToCheck.add(ProtocolCodecFilter.class.getName());
287 loggersToCheck.add(LoggingFilter.class.getName());
288
289
290 for (LoggingEvent event : events) {
291
292 if (loggersToCheck.contains(event.getLoggerName())) {
293 Object remoteAddress = event.getMDC("remoteAddress");
294 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
295 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
296 assertEquals(
297 "every event should have MDC[handlerClass]",
298 SimpleIoHandler.class.getName(),
299 event.getMDC("handlerClass") );
300 }
301 }
302
303 for (int i = 0; i < remoteAddressClients.length; i++) {
304 SocketAddress remoteAddressClient = remoteAddressClients[i];
305 assertEventExists(events, "sessionCreated", remoteAddressClient, null);
306 assertEventExists(events, "sessionOpened", remoteAddressClient, null);
307 assertEventExists(events, "decode", remoteAddressClient, null);
308 assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
309 assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
310 assertEventExists(events, "encode", remoteAddressClient, null);
311 assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
312 assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
313 assertEventExists(events, "messageSent-2", remoteAddressClient, null);
314 assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
315 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
316 assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
317 assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
318 }
319 }
320
321 private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
322 ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
323 connectFuture.awaitUninterruptibly(TIMEOUT);
324 IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
325 IoSession session = connectFuture.getSession();
326 session.write(message).awaitUninterruptibly(TIMEOUT);
327 return session.getLocalAddress();
328 }
329
330 private void assertEventExists(List<LoggingEvent> events,
331 String message,
332 SocketAddress address,
333 String user) {
334 InetSocketAddress remoteAddress = (InetSocketAddress) address;
335 for (LoggingEvent event : events) {
336 if (event.getMessage().equals(message) &&
337 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
338 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
339 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
340 if (user == null && event.getMDC("user") == null) {
341 return;
342 }
343 if (user != null && user.equals(event.getMDC("user"))) {
344 return;
345 }
346 return;
347 }
348 }
349 fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
350 }
351
352 private static class SimpleIoHandler extends IoHandlerAdapter {
353 CountDownLatch sessionIdleLatch = new CountDownLatch(2);
354 CountDownLatch sessionClosedLatch = new CountDownLatch(2);
355 CountDownLatch messageSentLatch = new CountDownLatch(2);
356
357
358
359
360 public SimpleIoHandler() {
361 super();
362 }
363
364 @Override
365 public void sessionCreated(IoSession session) throws Exception {
366 LOGGER.info("sessionCreated");
367 session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
368 }
369
370 @Override
371 public void sessionOpened(IoSession session) throws Exception {
372 LOGGER.info("sessionOpened");
373 }
374
375 @Override
376 public void sessionClosed(IoSession session) throws Exception {
377 LOGGER.info("sessionClosed");
378 sessionClosedLatch.countDown();
379 }
380
381 @Override
382 public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
383 LOGGER.info("sessionIdle");
384 sessionIdleLatch.countDown();
385 session.close(true);
386 }
387
388 @Override
389 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
390 LOGGER.info("exceptionCaught", cause);
391 }
392
393 @Override
394 public void messageReceived(IoSession session, Object message) throws Exception {
395 LOGGER.info("messageReceived-1");
396
397 String user = "user-" + message;
398 MdcInjectionFilter.setProperty(session, "user", user);
399 LOGGER.info("messageReceived-2");
400 session.getService().broadcast(message);
401 throw new RuntimeException("just a test, forcing exceptionCaught");
402 }
403
404 @Override
405 public void messageSent(IoSession session, Object message) throws Exception {
406 LOGGER.info("messageSent-1");
407 MdcInjectionFilter.removeProperty(session, "user");
408 LOGGER.info("messageSent-2");
409 messageSentLatch.countDown();
410 }
411 }
412
413 private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
414
415
416
417 public DummyProtocolCodecFactory() {
418 super();
419 }
420
421 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
422 return new ProtocolEncoderAdapter() {
423 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
424 LOGGER.info("encode");
425 IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
426 out.write(buffer);
427 }
428 };
429 }
430
431 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
432 return new ProtocolDecoderAdapter() {
433 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
434 if (in.remaining() >= 4) {
435 int value = in.getInt();
436 LOGGER.info("decode");
437 out.write(value);
438 }
439 }
440 };
441 }
442 }
443
444 private static class MyAppender extends AppenderSkeleton {
445 List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
446
447
448
449
450 public MyAppender() {
451 super();
452 }
453
454 public void clear() {
455 events.clear();
456 }
457
458 @Override
459 protected void append(final LoggingEvent loggingEvent) {
460 loggingEvent.getMDCCopy();
461 events.add(loggingEvent);
462 }
463
464 public boolean requiresLayout() {
465 return false;
466 }
467
468 public void close() {
469
470 }
471 }
472
473 static class DummyIoFilter extends IoFilterAdapter {
474 @Override
475 public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
476 LOGGER.info("DummyIoFilter.sessionOpened");
477 nextFilter.sessionOpened(session);
478 }
479 }
480
481
482 private List<String> getThreadNames() {
483 List<String> list = new ArrayList<String>();
484 int active = Thread.activeCount();
485 Thread[] threads = new Thread[active];
486 Thread.enumerate(threads);
487 for (Thread thread : threads) {
488 try {
489 String name = thread.getName();
490 list.add(name);
491 } catch (NullPointerException ignore) {
492 }
493 }
494 return list;
495 }
496
497 private boolean contains(List<String> list, String search) {
498 for (String s : list) {
499 if (s.contains(search)) {
500 return true;
501 }
502 }
503 return false;
504 }
505 }