View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.logging;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Set;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.log4j.AppenderSkeleton;
40  import org.apache.log4j.Level;
41  import org.apache.log4j.spi.LoggingEvent;
42  import org.apache.mina.core.buffer.IoBuffer;
43  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
44  import org.apache.mina.core.filterchain.IoFilterAdapter;
45  import org.apache.mina.core.future.ConnectFuture;
46  import org.apache.mina.core.service.IoHandlerAdapter;
47  import org.apache.mina.core.session.IdleStatus;
48  import org.apache.mina.core.session.IoSession;
49  import org.apache.mina.filter.codec.ProtocolCodecFactory;
50  import org.apache.mina.filter.codec.ProtocolCodecFilter;
51  import org.apache.mina.filter.codec.ProtocolDecoder;
52  import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
53  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
54  import org.apache.mina.filter.codec.ProtocolEncoder;
55  import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
56  import org.apache.mina.filter.codec.ProtocolEncoderOutput;
57  import org.apache.mina.filter.executor.ExecutorFilter;
58  import org.apache.mina.filter.statistic.ProfilerTimerFilter;
59  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
60  import org.apache.mina.transport.socket.nio.NioSocketConnector;
61  import org.junit.After;
62  import org.junit.Before;
63  import org.junit.Test;
64  import org.slf4j.Logger;
65  import org.slf4j.LoggerFactory;
66  
67  /**
68   * Tests {@link MdcInjectionFilter} in various scenarios.
69   *
70   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
71   */
72  public class MdcInjectionFilterTest {
73  
74      static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
75      private static final int TIMEOUT = 5000;
76  
77      private final MyAppender appender = new MyAppender();
78      private int port;
79      private NioSocketAcceptor acceptor;
80  
81      private Level previousLevelRootLogger;
82      private ExecutorFilter executorFilter1;
83      private ExecutorFilter executorFilter2;
84  
85      @Before
86      public void setUp() throws Exception {
87          // comment out next line if you want to see normal logging
88          org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
89          previousLevelRootLogger = org.apache.log4j.Logger.getRootLogger().getLevel();
90          org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
91          org.apache.log4j.Logger.getRootLogger().addAppender(appender);
92          acceptor = new NioSocketAcceptor();
93      }
94  
95  
96      @After
97      public void tearDown() throws Exception {
98          acceptor.dispose(true);
99          org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
100 
101         destroy(executorFilter1);
102         destroy(executorFilter2);
103 
104         List<String> after = getThreadNames();
105 
106         // give acceptor some time to shut down
107         Thread.sleep(50);
108         after = getThreadNames();
109 
110         int count = 0;
111 
112         // NOTE: this is *not* intended to be a permanent fix for this test-case.
113         // There used to be no API to block until the ExecutorService of AbstractIoService is terminated.
114         // The API exists now : dispose(true) so we should get rid of this code.
115 
116         while (contains(after, "Nio") && count++ < 10) {
117             Thread.sleep(50);
118             after = getThreadNames();
119         }
120 
121       while (contains(after, "pool") && count++ < 10) {
122           Thread.sleep(50);
123           after = getThreadNames();
124       }
125 
126         // The problem is that we clear the events of the appender here, but it's possible that a thread from
127         // a previous test still generates events during the execution of the next test
128         appender.clear();
129     }
130 
131     private void destroy(ExecutorFilter executorFilter) throws InterruptedException {
132         if (executorFilter != null) {
133             ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
134             executor.shutdown();
135             while (!executor.isTerminated()) {
136                 //System.out.println("Waiting for termination of " + executorFilter);  
137                 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
138             }
139         }
140     }
141 
142     @Test
143     public void testSimpleChain() throws IOException, InterruptedException {
144         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
145         chain.addFirst("mdc-injector", new MdcInjectionFilter());
146         chain.addLast("dummy", new DummyIoFilter());
147         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
148         test(chain);
149     }
150 
151     @Test
152     public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
153         executorFilter1 = new ExecutorFilter();
154         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
155         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
156         chain.addFirst("mdc-injector1", mdcInjectionFilter);
157         chain.addLast("dummy", new DummyIoFilter());
158         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
159         chain.addLast("executor" , executorFilter1);
160         chain.addLast("mdc-injector2", mdcInjectionFilter);
161         test(chain);
162     }
163 
164     @Test
165     public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
166         executorFilter1 = new ExecutorFilter();
167         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
168         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
169         chain.addLast("executor" , executorFilter1);
170         chain.addLast("mdc-injector", mdcInjectionFilter);
171         chain.addLast("dummy", new DummyIoFilter());
172         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
173         test(chain);
174     }
175 
176     @Test
177     public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
178         executorFilter1 = new ExecutorFilter();
179         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
180         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
181         chain.addLast("executor" , executorFilter1);
182         chain.addLast("mdc-injector", mdcInjectionFilter);
183         chain.addLast("dummy", new DummyIoFilter());
184         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
185         test(chain);
186     }
187 
188     @Test
189     public void testMultipleFilters() throws IOException, InterruptedException {
190         executorFilter1 = new ExecutorFilter();
191         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
192         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
193         chain.addLast("executor" , executorFilter1);
194         chain.addLast("mdc-injector", mdcInjectionFilter);
195         chain.addLast("profiler", new ProfilerTimerFilter());
196         chain.addLast("dummy", new DummyIoFilter());
197         chain.addLast("logger", new LoggingFilter());
198         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
199         test(chain);
200     }
201 
202     @Test
203     public void testTwoExecutorFilters() throws IOException, InterruptedException {
204         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
205         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
206         executorFilter1 = new ExecutorFilter();
207         executorFilter2 = new ExecutorFilter();
208         chain.addLast("executorFilter1" , executorFilter1);
209         chain.addLast("mdc-injector1", mdcInjectionFilter);
210         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
211         chain.addLast("dummy", new DummyIoFilter());
212         chain.addLast("executorFilter2" , executorFilter2);
213         // add the MdcInjectionFilter instance after every ExecutorFilter
214         // it's important to use the same MdcInjectionFilter instance
215         chain.addLast("mdc-injector2",  mdcInjectionFilter);
216         test(chain);
217     }
218 
219     @Test
220     public void testOnlyRemoteAddress() throws IOException, InterruptedException {
221         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
222         chain.addFirst("mdc-injector", new MdcInjectionFilter(
223             MdcInjectionFilter.MdcKey.remoteAddress));
224         chain.addLast("dummy", new DummyIoFilter());
225         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
226         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
227         acceptor.setHandler(simpleIoHandler);
228         acceptor.bind(new InetSocketAddress(0));
229         port = acceptor.getLocalAddress().getPort();
230         acceptor.setFilterChainBuilder(chain);
231         // create some clients
232         NioSocketConnector connector = new NioSocketConnector();
233         connector.setHandler(new IoHandlerAdapter());
234         connectAndWrite(connector,0);
235         connectAndWrite(connector,1);
236         // wait until Iohandler has received all events
237         simpleIoHandler.messageSentLatch.await();
238         simpleIoHandler.sessionIdleLatch.await();
239         simpleIoHandler.sessionClosedLatch.await();
240         connector.dispose(true);
241 
242         // make a copy to prevent ConcurrentModificationException
243         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
244         // verify that all logging events have correct MDC
245         for (LoggingEvent event : events) {
246             if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
247               continue;
248             }
249             for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
250               String key = mdcKey.name();
251               Object value = event.getMDC(key);
252               if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
253                   assertNotNull(
254                       "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
255               } else {
256                   assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
257               }
258             }
259         }
260     }
261 
262     private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
263         // configure the server
264         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
265         acceptor.setHandler(simpleIoHandler);
266         acceptor.bind(new InetSocketAddress(0));
267         port = acceptor.getLocalAddress().getPort();
268         acceptor.setFilterChainBuilder(chain);
269         // create some clients
270         NioSocketConnector connector = new NioSocketConnector();
271         connector.setHandler(new IoHandlerAdapter());
272         SocketAddress remoteAddressClients[] = new SocketAddress[2];
273         remoteAddressClients[0] = connectAndWrite(connector,0);
274         remoteAddressClients[1] = connectAndWrite(connector,1);
275         // wait until Iohandler has received all events
276         simpleIoHandler.messageSentLatch.await();
277         simpleIoHandler.sessionIdleLatch.await();
278         simpleIoHandler.sessionClosedLatch.await();
279         connector.dispose(true);
280 
281         // make a copy to prevent ConcurrentModificationException
282         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
283 
284         Set<String> loggersToCheck = new HashSet<String>();
285         loggersToCheck.add(MdcInjectionFilterTest.class.getName());
286         loggersToCheck.add(ProtocolCodecFilter.class.getName());
287         loggersToCheck.add(LoggingFilter.class.getName());
288 
289         // verify that all logging events have correct MDC
290         for (LoggingEvent event : events) {
291 
292             if (loggersToCheck.contains(event.getLoggerName())) {
293                 Object remoteAddress = event.getMDC("remoteAddress");
294                 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
295                 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
296                 assertEquals(
297                     "every event should have MDC[handlerClass]",
298                     SimpleIoHandler.class.getName(),
299                     event.getMDC("handlerClass") );
300             }
301         }
302         // assert we have received all expected logging events for each client
303         for (int i = 0; i < remoteAddressClients.length; i++) {
304             SocketAddress remoteAddressClient = remoteAddressClients[i];
305             assertEventExists(events, "sessionCreated", remoteAddressClient, null);
306             assertEventExists(events, "sessionOpened", remoteAddressClient, null);
307             assertEventExists(events, "decode", remoteAddressClient, null);
308             assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
309             assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
310             assertEventExists(events, "encode", remoteAddressClient, null);
311             assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
312             assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
313             assertEventExists(events, "messageSent-2", remoteAddressClient, null);
314             assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
315             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
316             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
317             assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
318         }
319     }
320 
321     private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
322         ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
323         connectFuture.awaitUninterruptibly(TIMEOUT);
324         IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
325         IoSession session = connectFuture.getSession();
326         session.write(message).awaitUninterruptibly(TIMEOUT);
327         return session.getLocalAddress();
328     }
329 
330     private void assertEventExists(List<LoggingEvent> events,
331                                    String message,
332                                    SocketAddress address,
333                                    String user) {
334         InetSocketAddress remoteAddress = (InetSocketAddress) address;
335         for (LoggingEvent event : events) {
336             if (event.getMessage().equals(message) &&
337                 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
338                 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
339                 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
340                 if (user == null && event.getMDC("user") == null) {
341                     return;
342                 }
343                 if (user != null && user.equals(event.getMDC("user"))) {
344                     return;
345                 }
346                 return;
347             }
348         }
349         fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
350     }
351 
352     private static class SimpleIoHandler extends IoHandlerAdapter {
353         CountDownLatch sessionIdleLatch = new CountDownLatch(2);
354         CountDownLatch sessionClosedLatch = new CountDownLatch(2);
355         CountDownLatch messageSentLatch = new CountDownLatch(2);
356 
357         /**
358          * Default constructor
359          */
360         public SimpleIoHandler() {
361             super();
362         }
363 
364         @Override
365         public void sessionCreated(IoSession session) throws Exception {
366             LOGGER.info("sessionCreated");
367             session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
368         }
369 
370         @Override
371         public void sessionOpened(IoSession session) throws Exception {
372             LOGGER.info("sessionOpened");
373         }
374 
375         @Override
376         public void sessionClosed(IoSession session) throws Exception {
377             LOGGER.info("sessionClosed");
378             sessionClosedLatch.countDown();
379         }
380 
381         @Override
382         public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
383             LOGGER.info("sessionIdle");
384             sessionIdleLatch.countDown();
385             session.close(true);
386         }
387 
388         @Override
389         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
390             LOGGER.info("exceptionCaught", cause);
391         }
392 
393         @Override
394         public void messageReceived(IoSession session, Object message) throws Exception {
395             LOGGER.info("messageReceived-1");
396             // adding a custom property to the context
397             String user = "user-" + message;
398             MdcInjectionFilter.setProperty(session, "user", user);
399             LOGGER.info("messageReceived-2");
400             session.getService().broadcast(message);
401             throw new RuntimeException("just a test, forcing exceptionCaught");
402         }
403 
404         @Override
405         public void messageSent(IoSession session, Object message) throws Exception {
406             LOGGER.info("messageSent-1");
407             MdcInjectionFilter.removeProperty(session, "user");
408             LOGGER.info("messageSent-2");
409             messageSentLatch.countDown();
410         }
411     }
412 
413     private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
414         /**
415          * Default constructor
416          */
417         public DummyProtocolCodecFactory() {
418             super();
419         }
420 
421         public ProtocolEncoder getEncoder(IoSession session) throws Exception {
422             return new ProtocolEncoderAdapter() {
423                 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
424                     LOGGER.info("encode");
425                     IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
426                     out.write(buffer);
427                 }
428             };
429         }
430 
431         public ProtocolDecoder getDecoder(IoSession session) throws Exception {
432             return new ProtocolDecoderAdapter() {
433                 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
434                     if (in.remaining() >= 4) {
435                         int value = in.getInt();
436                         LOGGER.info("decode");
437                         out.write(value);
438                     }
439                 }
440             };
441         }
442     }
443 
444     private static class MyAppender extends AppenderSkeleton {
445         List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
446 
447         /**
448          * Default constructor
449          */
450         public MyAppender() {
451             super();
452         }
453 
454         public void clear() {
455             events.clear();
456         }
457 
458         @Override
459         protected void append(final LoggingEvent loggingEvent) {
460             loggingEvent.getMDCCopy();
461             events.add(loggingEvent);
462         }
463 
464         public boolean requiresLayout() {
465             return false;
466         }
467 
468         public void close() {
469             // Do nothing
470         }
471     }
472 
473     static class DummyIoFilter extends IoFilterAdapter {
474         @Override
475         public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
476             LOGGER.info("DummyIoFilter.sessionOpened");
477             nextFilter.sessionOpened(session);
478         }
479     }
480 
481 
482     private List<String> getThreadNames() {
483         List<String> list = new ArrayList<String>();
484         int active = Thread.activeCount();
485         Thread[] threads = new Thread[active];
486         Thread.enumerate(threads);
487         for (Thread thread : threads) {
488             try {
489                 String name = thread.getName();
490                 list.add(name);
491             } catch (NullPointerException ignore) {
492             }
493         }
494         return list;
495     }
496 
497     private boolean contains(List<String> list, String search) {
498         for (String s : list) {
499             if (s.contains(search)) {
500                 return true;
501             }
502         }
503         return false;
504     }
505 }