1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import junit.framework.Assert;
23 import org.apache.mina.core.future.CloseFuture;
24 import org.apache.mina.core.future.ConnectFuture;
25 import org.apache.mina.core.future.IoFuture;
26 import org.apache.mina.core.future.IoFutureListener;
27 import org.apache.mina.core.session.IdleStatus;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.filter.codec.ProtocolCodecFilter;
30 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
31 import org.apache.mina.filter.logging.LoggingFilter;
32 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33 import org.apache.mina.transport.socket.nio.NioSocketConnector;
34 import org.junit.Test;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import java.io.IOException;
39 import java.net.InetSocketAddress;
40 import java.nio.charset.Charset;
41 import java.util.ArrayList;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44
45
46
47
48
49
50 public class AbstractIoServiceTest {
51
52 private static final int PORT = 9123;
53
54 @Test
55 public void testDispose() throws IOException, InterruptedException {
56
57 List threadsBefore = getThreadNames();
58
59 final IoAcceptor acceptor = new NioSocketAcceptor();
60
61 acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
62 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
63
64 acceptor.setHandler( new ServerHandler() );
65
66 acceptor.getSessionConfig().setReadBufferSize( 2048 );
67 acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
68 acceptor.bind( new InetSocketAddress(PORT) );
69 System.out.println("Server running ...");
70
71 final NioSocketConnector connector = new NioSocketConnector();
72
73
74 connector.setConnectTimeoutMillis(30 * 1000L);
75
76 connector.setHandler(new ClientHandler());
77 connector.getFilterChain().addLast( "logger", new LoggingFilter() );
78 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
79
80
81 ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
82 cf.awaitUninterruptibly();
83
84 IoSession session = cf.getSession();
85
86
87 session.write("Hello World!\r");
88
89
90 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
91 latch.await();
92
93
94 CloseFuture closeFuture = session.close(false);
95
96 System.out.println("session.close called");
97
98
99
100 closeFuture.addListener(new IoFutureListener<IoFuture>() {
101
102 public void operationComplete(IoFuture future) {
103 System.out.println("managed session count=" + connector.getManagedSessionCount());
104 System.out.println("Disposing connector ...");
105 connector.dispose(true);
106 System.out.println("Disposing connector ... *finished*");
107
108 }
109 });
110
111 closeFuture.awaitUninterruptibly();
112 acceptor.dispose(true);
113
114 List threadsAfter = getThreadNames();
115
116 System.out.println("threadsBefore = " + threadsBefore);
117 System.out.println("threadsAfter = " + threadsAfter);
118
119
120
121 }
122
123
124 public static class ClientHandler extends IoHandlerAdapter {
125
126 private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
127
128 @Override
129 public void sessionCreated(IoSession session) throws Exception {
130 session.setAttribute("latch", new CountDownLatch(1));
131 }
132
133 @Override
134 public void messageReceived(IoSession session, Object message) throws Exception {
135 LOGGER.info("client: messageReceived("+session+", "+message+")");
136 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
137 latch.countDown();
138 }
139
140 @Override
141 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
142 LOGGER.warn("exceptionCaught:", cause);
143 }
144 }
145
146 public static class ServerHandler extends IoHandlerAdapter {
147
148 private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
149
150 @Override
151 public void messageReceived(IoSession session, Object message) throws Exception {
152 LOGGER.info("server: messageReceived("+session+", "+message+")");
153 session.write(message.toString());
154 }
155
156 @Override
157 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
158 LOGGER.warn("exceptionCaught:", cause);
159 }
160
161 }
162
163 public static void main(String[] args) throws IOException, InterruptedException {
164 new AbstractIoServiceTest().testDispose();
165 }
166
167 private List<String> getThreadNames() {
168 List<String> list = new ArrayList<String>();
169 int active = Thread.activeCount();
170 Thread[] threads = new Thread[active];
171 Thread.enumerate(threads);
172 for (Thread thread : threads) {
173 try {
174 String name = thread.getName();
175 list.add(name);
176 } catch (NullPointerException ignore) {
177 }
178 }
179 return list;
180 }
181
182 }