View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.logging;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Set;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.log4j.AppenderSkeleton;
40  import org.apache.log4j.Level;
41  import org.apache.log4j.spi.LoggingEvent;
42  import org.apache.mina.core.buffer.IoBuffer;
43  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
44  import org.apache.mina.core.filterchain.IoFilterAdapter;
45  import org.apache.mina.core.future.ConnectFuture;
46  import org.apache.mina.core.service.IoHandlerAdapter;
47  import org.apache.mina.core.session.IdleStatus;
48  import org.apache.mina.core.session.IoSession;
49  import org.apache.mina.filter.codec.ProtocolCodecFactory;
50  import org.apache.mina.filter.codec.ProtocolCodecFilter;
51  import org.apache.mina.filter.codec.ProtocolDecoder;
52  import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
53  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
54  import org.apache.mina.filter.codec.ProtocolEncoder;
55  import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
56  import org.apache.mina.filter.codec.ProtocolEncoderOutput;
57  import org.apache.mina.filter.executor.ExecutorFilter;
58  import org.apache.mina.filter.statistic.ProfilerTimerFilter;
59  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
60  import org.apache.mina.transport.socket.nio.NioSocketConnector;
61  import org.junit.After;
62  import org.junit.Before;
63  import org.junit.Test;
64  import org.slf4j.Logger;
65  import org.slf4j.LoggerFactory;
66  
67  /**
68   * Tests {@link MdcInjectionFilter} in various scenarios.
69   *
70   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
71   */
72  public class MdcInjectionFilterTest {
73  
74      static Logger LOGGER = LoggerFactory.getLogger(MdcInjectionFilterTest.class);
75      private static final int TIMEOUT = 5000;
76  
77      private final MyAppender appender = new MyAppender();
78      private int port;
79      private NioSocketAcceptor acceptor;
80  
81      private Level previousLevelRootLogger;
82      private ExecutorFilter executorFilter1;
83      private ExecutorFilter executorFilter2;
84  
85      @Before
86      public void setUp() throws Exception {
87          // comment out next line if you want to see normal logging
88          org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
89          previousLevelRootLogger = org.apache.log4j.Logger.getRootLogger().getLevel();
90          org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
91          org.apache.log4j.Logger.getRootLogger().addAppender(appender);
92          acceptor = new NioSocketAcceptor();
93      }
94  
95  
96      @After
97      public void tearDown() throws Exception {
98          acceptor.dispose(true);
99          org.apache.log4j.Logger.getRootLogger().setLevel(previousLevelRootLogger);
100 
101         destroy(executorFilter1);
102         destroy(executorFilter2);
103 
104         List<String> after = getThreadNames();
105 
106         // give acceptor some time to shut down
107         Thread.sleep(50);
108         after = getThreadNames();
109 
110         int count = 0;
111 
112         // NOTE: this is *not* intended to be a permanent fix for this test-case.
113         // There used to be no API to block until the ExecutorService of AbstractIoService is terminated.
114         // The API exists now : dispose(true) so we should get rid of this code.
115 
116         while (contains(after, "Nio") && count++ < 10) {
117             Thread.sleep(50);
118             after = getThreadNames();
119             System.out.println("** after = " + after);
120         }
121         System.out.println("============================");
122 
123       while (contains(after, "pool") && count++ < 10) {
124           Thread.sleep(50);
125           after = getThreadNames();
126           System.out.println("** after = " + after);
127       }
128       System.out.println("============================");
129 
130         // The problem is that we clear the events of the appender here, but it's possible that a thread from
131         // a previous test still generates events during the execution of the next test
132         appender.clear();
133     }
134 
135     private void destroy(ExecutorFilter executorFilter) throws InterruptedException {
136         if (executorFilter != null) {
137             ExecutorService executor = (ExecutorService) executorFilter.getExecutor();
138             executor.shutdown();
139             while (!executor.isTerminated()) {
140                 //System.out.println("Waiting for termination of " + executorFilter);  
141                 executor.awaitTermination(10, TimeUnit.MILLISECONDS);
142             }
143         }
144     }
145 
146     @Test
147     public void testSimpleChain() throws IOException, InterruptedException {
148         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
149         chain.addFirst("mdc-injector", new MdcInjectionFilter());
150         chain.addLast("dummy", new DummyIoFilter());
151         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
152         test(chain);
153     }
154 
155     @Test
156     public void testExecutorFilterAtTheEnd() throws IOException, InterruptedException {
157         executorFilter1 = new ExecutorFilter();
158         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
159         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
160         chain.addFirst("mdc-injector1", mdcInjectionFilter);
161         chain.addLast("dummy", new DummyIoFilter());
162         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
163         chain.addLast("executor" , executorFilter1);
164         chain.addLast("mdc-injector2", mdcInjectionFilter);
165         test(chain);
166     }
167 
168     @Test
169     public void testExecutorFilterAtBeginning() throws IOException, InterruptedException {
170         executorFilter1 = new ExecutorFilter();
171         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
172         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
173         chain.addLast("executor" , executorFilter1);
174         chain.addLast("mdc-injector", mdcInjectionFilter);
175         chain.addLast("dummy", new DummyIoFilter());
176         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
177         test(chain);
178     }
179 
180     @Test
181     public void testExecutorFilterBeforeProtocol() throws IOException, InterruptedException {
182         executorFilter1 = new ExecutorFilter();
183         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
184         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
185         chain.addLast("executor" , executorFilter1);
186         chain.addLast("mdc-injector", mdcInjectionFilter);
187         chain.addLast("dummy", new DummyIoFilter());
188         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
189         test(chain);
190     }
191 
192     @Test
193     public void testMultipleFilters() throws IOException, InterruptedException {
194         executorFilter1 = new ExecutorFilter();
195         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
196         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
197         chain.addLast("executor" , executorFilter1);
198         chain.addLast("mdc-injector", mdcInjectionFilter);
199         chain.addLast("profiler", new ProfilerTimerFilter());
200         chain.addLast("dummy", new DummyIoFilter());
201         chain.addLast("logger", new LoggingFilter());
202         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
203         test(chain);
204     }
205 
206     @Test
207     public void testTwoExecutorFilters() throws IOException, InterruptedException {
208         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
209         MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
210         executorFilter1 = new ExecutorFilter();
211         executorFilter2 = new ExecutorFilter();
212         chain.addLast("executorFilter1" , executorFilter1);
213         chain.addLast("mdc-injector1", mdcInjectionFilter);
214         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
215         chain.addLast("dummy", new DummyIoFilter());
216         chain.addLast("executorFilter2" , executorFilter2);
217         // add the MdcInjectionFilter instance after every ExecutorFilter
218         // it's important to use the same MdcInjectionFilter instance
219         chain.addLast("mdc-injector2",  mdcInjectionFilter);
220         test(chain);
221     }
222 
223     @Test
224     public void testOnlyRemoteAddress() throws IOException, InterruptedException {
225         DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
226         chain.addFirst("mdc-injector", new MdcInjectionFilter(
227             MdcInjectionFilter.MdcKey.remoteAddress));
228         chain.addLast("dummy", new DummyIoFilter());
229         chain.addLast("protocol", new ProtocolCodecFilter(new DummyProtocolCodecFactory()));
230         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
231         acceptor.setHandler(simpleIoHandler);
232         acceptor.bind(new InetSocketAddress(0));
233         port = acceptor.getLocalAddress().getPort();
234         acceptor.setFilterChainBuilder(chain);
235         // create some clients
236         NioSocketConnector connector = new NioSocketConnector();
237         connector.setHandler(new IoHandlerAdapter());
238         connectAndWrite(connector,0);
239         connectAndWrite(connector,1);
240         // wait until Iohandler has received all events
241         simpleIoHandler.messageSentLatch.await();
242         simpleIoHandler.sessionIdleLatch.await();
243         simpleIoHandler.sessionClosedLatch.await();
244         connector.dispose(true);
245 
246         // make a copy to prevent ConcurrentModificationException
247         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
248         // verify that all logging events have correct MDC
249         for (LoggingEvent event : events) {
250             if (event.getLoggerName().startsWith("org.apache.mina.core.service.AbstractIoService")) {
251               continue;
252             }
253             for (MdcInjectionFilter.MdcKey mdcKey : MdcInjectionFilter.MdcKey.values()) {
254               String key = mdcKey.name();
255               Object value = event.getMDC(key);
256               if (mdcKey == MdcInjectionFilter.MdcKey.remoteAddress) {
257                   assertNotNull(
258                       "MDC[remoteAddress] not set for [" + event.getMessage() + "]", value);
259               } else {
260                   assertNull("MDC[" + key + "] set for [" + event.getMessage() + "]", value);
261               }
262             }
263         }
264     }
265 
266     private void test(DefaultIoFilterChainBuilder chain) throws IOException, InterruptedException {
267         // configure the server
268         SimpleIoHandler simpleIoHandler = new SimpleIoHandler();
269         acceptor.setHandler(simpleIoHandler);
270         acceptor.bind(new InetSocketAddress(0));
271         port = acceptor.getLocalAddress().getPort();
272         acceptor.setFilterChainBuilder(chain);
273         // create some clients
274         NioSocketConnector connector = new NioSocketConnector();
275         connector.setHandler(new IoHandlerAdapter());
276         SocketAddress remoteAddressClients[] = new SocketAddress[2];
277         remoteAddressClients[0] = connectAndWrite(connector,0);
278         remoteAddressClients[1] = connectAndWrite(connector,1);
279         // wait until Iohandler has received all events
280         simpleIoHandler.messageSentLatch.await();
281         simpleIoHandler.sessionIdleLatch.await();
282         simpleIoHandler.sessionClosedLatch.await();
283         connector.dispose(true);
284 
285         // make a copy to prevent ConcurrentModificationException
286         List<LoggingEvent> events = new ArrayList<LoggingEvent>(appender.events);
287 
288         Set<String> loggersToCheck = new HashSet<String>();
289         loggersToCheck.add(MdcInjectionFilterTest.class.getName());
290         loggersToCheck.add(ProtocolCodecFilter.class.getName());
291         loggersToCheck.add(LoggingFilter.class.getName());
292 
293         // verify that all logging events have correct MDC
294         for (LoggingEvent event : events) {
295 
296             if (loggersToCheck.contains(event.getLoggerName())) {
297                 Object remoteAddress = event.getMDC("remoteAddress");
298                 assertNotNull("MDC[remoteAddress] not set for [" + event.getMessage() + "]", remoteAddress);
299                 assertNotNull("MDC[remotePort] not set for [" + event.getMessage() + "]", event.getMDC("remotePort"));
300                 assertEquals(
301                     "every event should have MDC[handlerClass]",
302                     SimpleIoHandler.class.getName(),
303                     event.getMDC("handlerClass") );
304             }
305         }
306         // assert we have received all expected logging events for each client
307         for (int i = 0; i < remoteAddressClients.length; i++) {
308             SocketAddress remoteAddressClient = remoteAddressClients[i];
309             assertEventExists(events, "sessionCreated", remoteAddressClient, null);
310             assertEventExists(events, "sessionOpened", remoteAddressClient, null);
311             assertEventExists(events, "decode", remoteAddressClient, null);
312             assertEventExists(events, "messageReceived-1", remoteAddressClient, null);
313             assertEventExists(events, "messageReceived-2", remoteAddressClient, "user-" + i);
314             assertEventExists(events, "encode", remoteAddressClient, null);
315             assertEventExists(events, "exceptionCaught", remoteAddressClient, "user-" + i);
316             assertEventExists(events, "messageSent-1", remoteAddressClient, "user-" + i);
317             assertEventExists(events, "messageSent-2", remoteAddressClient, null);
318             assertEventExists(events, "sessionIdle", remoteAddressClient, "user-" + i);
319             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
320             assertEventExists(events, "sessionClosed", remoteAddressClient, "user-" + i);
321             assertEventExists(events, "DummyIoFilter.sessionOpened", remoteAddressClient, "user-" + i);
322         }
323     }
324 
325     private SocketAddress connectAndWrite(NioSocketConnector connector, int clientNr) {
326         ConnectFuture connectFuture = connector.connect(new InetSocketAddress("localhost", port));
327         connectFuture.awaitUninterruptibly(TIMEOUT);
328         IoBuffer message = IoBuffer.allocate(4).putInt(clientNr).flip();
329         IoSession session = connectFuture.getSession();
330         session.write(message).awaitUninterruptibly(TIMEOUT);
331         return session.getLocalAddress();
332     }
333 
334     private void assertEventExists(List<LoggingEvent> events,
335                                    String message,
336                                    SocketAddress address,
337                                    String user) {
338         InetSocketAddress remoteAddress = (InetSocketAddress) address;
339         for (LoggingEvent event : events) {
340             if (event.getMessage().equals(message) &&
341                 event.getMDC("remoteAddress").equals(remoteAddress.toString()) &&
342                 event.getMDC("remoteIp").equals(remoteAddress.getAddress().getHostAddress()) &&
343                 event.getMDC("remotePort").equals(remoteAddress.getPort()+"") ) {
344                 if (user == null && event.getMDC("user") == null) {
345                     return;
346                 }
347                 if (user != null && user.equals(event.getMDC("user"))) {
348                     return;
349                 }
350                 return;
351             }
352         }
353         fail("No LoggingEvent found from [" + remoteAddress +"] with message [" + message + "]");
354     }
355 
356     private static class SimpleIoHandler extends IoHandlerAdapter {
357         CountDownLatch sessionIdleLatch = new CountDownLatch(2);
358         CountDownLatch sessionClosedLatch = new CountDownLatch(2);
359         CountDownLatch messageSentLatch = new CountDownLatch(2);
360 
361         /**
362          * Default constructor
363          */
364         public SimpleIoHandler() {
365             super();
366         }
367 
368         @Override
369         public void sessionCreated(IoSession session) throws Exception {
370             LOGGER.info("sessionCreated");
371             session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 1);
372         }
373 
374         @Override
375         public void sessionOpened(IoSession session) throws Exception {
376             LOGGER.info("sessionOpened");
377         }
378 
379         @Override
380         public void sessionClosed(IoSession session) throws Exception {
381             LOGGER.info("sessionClosed");
382             sessionClosedLatch.countDown();
383         }
384 
385         @Override
386         public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
387             LOGGER.info("sessionIdle");
388             sessionIdleLatch.countDown();
389             session.close(true);
390         }
391 
392         @Override
393         public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
394             LOGGER.info("exceptionCaught", cause);
395         }
396 
397         @Override
398         public void messageReceived(IoSession session, Object message) throws Exception {
399             LOGGER.info("messageReceived-1");
400             // adding a custom property to the context
401             String user = "user-" + message;
402             MdcInjectionFilter.setProperty(session, "user", user);
403             LOGGER.info("messageReceived-2");
404             session.getService().broadcast(message);
405             throw new RuntimeException("just a test, forcing exceptionCaught");
406         }
407 
408         @Override
409         public void messageSent(IoSession session, Object message) throws Exception {
410             LOGGER.info("messageSent-1");
411             MdcInjectionFilter.removeProperty(session, "user");
412             LOGGER.info("messageSent-2");
413             messageSentLatch.countDown();
414         }
415     }
416 
417     private static class DummyProtocolCodecFactory implements ProtocolCodecFactory {
418         /**
419          * Default constructor
420          */
421         public DummyProtocolCodecFactory() {
422             super();
423         }
424 
425         public ProtocolEncoder getEncoder(IoSession session) throws Exception {
426             return new ProtocolEncoderAdapter() {
427                 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
428                     LOGGER.info("encode");
429                     IoBuffer buffer = IoBuffer.allocate(4).putInt(123).flip();
430                     out.write(buffer);
431                 }
432             };
433         }
434 
435         public ProtocolDecoder getDecoder(IoSession session) throws Exception {
436             return new ProtocolDecoderAdapter() {
437                 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
438                     if (in.remaining() >= 4) {
439                         int value = in.getInt();
440                         LOGGER.info("decode");
441                         out.write(value);
442                     }
443                 }
444             };
445         }
446     }
447 
448     private static class MyAppender extends AppenderSkeleton {
449         List<LoggingEvent> events = Collections.synchronizedList(new ArrayList<LoggingEvent>());
450 
451         /**
452          * Default constructor
453          */
454         public MyAppender() {
455             super();
456         }
457 
458         public void clear() {
459             events.clear();
460         }
461 
462         @Override
463         protected void append(final LoggingEvent loggingEvent) {
464             loggingEvent.getMDCCopy();
465             events.add(loggingEvent);
466         }
467 
468         @Override
469         public boolean requiresLayout() {
470             return false;
471         }
472 
473         @Override
474         public void close() {
475             // Do nothing
476         }
477     }
478 
479     static class DummyIoFilter extends IoFilterAdapter {
480         @Override
481         public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
482             LOGGER.info("DummyIoFilter.sessionOpened");
483             nextFilter.sessionOpened(session);
484         }
485     }
486 
487 
488     private List<String> getThreadNames() {
489         List<String> list = new ArrayList<String>();
490         int active = Thread.activeCount();
491         Thread[] threads = new Thread[active];
492         Thread.enumerate(threads);
493         for (Thread thread : threads) {
494             try {
495                 String name = thread.getName();
496                 list.add(name);
497             } catch (NullPointerException ignore) {
498             }
499         }
500         return list;
501     }
502 
503     private boolean contains(List<String> list, String search) {
504         for (String s : list) {
505             if (s.contains(search)) {
506                 return true;
507             }
508         }
509         return false;
510     }
511 }