View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import junit.framework.Assert;
23  import org.apache.mina.core.future.CloseFuture;
24  import org.apache.mina.core.future.ConnectFuture;
25  import org.apache.mina.core.future.IoFuture;
26  import org.apache.mina.core.future.IoFutureListener;
27  import org.apache.mina.core.session.IdleStatus;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.filter.codec.ProtocolCodecFilter;
30  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
31  import org.apache.mina.filter.logging.LoggingFilter;
32  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33  import org.apache.mina.transport.socket.nio.NioSocketConnector;
34  import org.junit.Test;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import java.io.IOException;
39  import java.net.InetSocketAddress;
40  import java.nio.charset.Charset;
41  import java.util.ArrayList;
42  import java.util.List;
43  import java.util.concurrent.CountDownLatch;
44  
45  /**
46   * test the AbstractIoService
47   *
48   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
49   */
50  public class AbstractIoServiceTest {
51  
52    private static final int PORT = 9123;
53  
54    @Test
55    public void testDispose() throws IOException, InterruptedException {
56  
57      List threadsBefore = getThreadNames();
58  
59      final IoAcceptor acceptor = new NioSocketAcceptor();
60  
61      acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
62      acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
63  
64      acceptor.setHandler(  new ServerHandler() );
65  
66      acceptor.getSessionConfig().setReadBufferSize( 2048 );
67      acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
68      acceptor.bind( new InetSocketAddress(PORT) );
69      System.out.println("Server running ...");
70  
71      final NioSocketConnector connector = new NioSocketConnector();
72  
73      // Set connect timeout.
74      connector.setConnectTimeoutMillis(30 * 1000L);
75  
76      connector.setHandler(new ClientHandler());
77      connector.getFilterChain().addLast( "logger", new LoggingFilter() );
78      connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
79  
80      // Start communication.
81      ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
82      cf.awaitUninterruptibly();
83  
84      IoSession session = cf.getSession();
85  
86      // send a message
87      session.write("Hello World!\r");
88  
89      // wait until response is received
90      CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
91      latch.await();
92  
93      // close the session
94      CloseFuture closeFuture = session.close(false);
95  
96      System.out.println("session.close called");
97      //Thread.sleep(5);
98  
99      // wait for session close and then dispose the connector
100     closeFuture.addListener(new IoFutureListener<IoFuture>() {
101 
102       public void operationComplete(IoFuture future) {
103         System.out.println("managed session count=" + connector.getManagedSessionCount());
104         System.out.println("Disposing connector ...");
105         connector.dispose(true);
106         System.out.println("Disposing connector ... *finished*");
107 
108       }
109     });
110 
111     closeFuture.awaitUninterruptibly();    
112     acceptor.dispose(true);
113 
114     List threadsAfter = getThreadNames();
115 
116     System.out.println("threadsBefore = " + threadsBefore);
117     System.out.println("threadsAfter  = " + threadsAfter);
118 
119     // Assert.assertEquals(threadsBefore, threadsAfter);
120 
121   }
122 
123 
124   public static class ClientHandler extends IoHandlerAdapter {
125 
126     private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
127 
128     @Override
129     public void sessionCreated(IoSession session) throws Exception {
130       session.setAttribute("latch", new CountDownLatch(1));
131     }
132 
133     @Override
134     public void messageReceived(IoSession session, Object message) throws Exception {
135       LOGGER.info("client: messageReceived("+session+", "+message+")");
136       CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
137       latch.countDown();
138     }
139 
140     @Override
141     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
142       LOGGER.warn("exceptionCaught:", cause);
143     }
144   }
145 
146   public static class ServerHandler extends IoHandlerAdapter {
147 
148     private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
149 
150     @Override
151     public void messageReceived(IoSession session, Object message) throws Exception {
152       LOGGER.info("server: messageReceived("+session+", "+message+")");
153       session.write(message.toString());
154     }
155 
156     @Override
157     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
158       LOGGER.warn("exceptionCaught:", cause);
159     }
160 
161   }
162 
163   public static void main(String[] args) throws IOException, InterruptedException {
164     new AbstractIoServiceTest().testDispose();
165   }
166 
167   private List<String> getThreadNames() {
168       List<String> list = new ArrayList<String>();
169       int active = Thread.activeCount();
170       Thread[] threads = new Thread[active];
171       Thread.enumerate(threads);
172       for (Thread thread : threads) {
173           try {
174               String name = thread.getName();
175               list.add(name);
176           } catch (NullPointerException ignore) {
177           }
178       }
179       return list;
180   }
181 
182 }