1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import junit.framework.Assert;
23 import org.apache.mina.core.future.CloseFuture;
24 import org.apache.mina.core.future.ConnectFuture;
25 import org.apache.mina.core.future.IoFuture;
26 import org.apache.mina.core.future.IoFutureListener;
27 import org.apache.mina.core.session.IdleStatus;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.filter.codec.ProtocolCodecFilter;
30 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
31 import org.apache.mina.filter.logging.LoggingFilter;
32 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33 import org.apache.mina.transport.socket.nio.NioSocketConnector;
34 import org.junit.Test;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import java.io.IOException;
39 import java.net.InetSocketAddress;
40 import java.nio.charset.Charset;
41 import java.util.ArrayList;
42 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44
45
46
47
48
49
50 public class AbstractIoServiceTest {
51
52 private static final int PORT = 9123;
53
54 @Test
55 public void testDispose() throws IOException, InterruptedException {
56
57 List threadsBefore = getThreadNames();
58
59 final IoAcceptor acceptor = new NioSocketAcceptor();
60
61 acceptor.getFilterChain().addLast("logger", new LoggingFilter());
62 acceptor.getFilterChain().addLast("codec",
63 new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
64
65 acceptor.setHandler(new ServerHandler());
66
67 acceptor.getSessionConfig().setReadBufferSize(2048);
68 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
69 acceptor.bind(new InetSocketAddress(PORT));
70 System.out.println("Server running ...");
71
72 final NioSocketConnector connector = new NioSocketConnector();
73
74
75 connector.setConnectTimeoutMillis(30 * 1000L);
76
77 connector.setHandler(new ClientHandler());
78 connector.getFilterChain().addLast("logger", new LoggingFilter());
79 connector.getFilterChain().addLast("codec",
80 new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
81
82
83 ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
84 cf.awaitUninterruptibly();
85
86 IoSession session = cf.getSession();
87
88
89 session.write("Hello World!\r");
90
91
92 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
93 latch.await();
94
95
96 CloseFuture closeFuture = session.close(false);
97
98 System.out.println("session.close called");
99
100
101
102 closeFuture.addListener(new IoFutureListener<IoFuture>() {
103
104 public void operationComplete(IoFuture future) {
105 System.out.println("managed session count=" + connector.getManagedSessionCount());
106 System.out.println("Disposing connector ...");
107 connector.dispose(true);
108 System.out.println("Disposing connector ... *finished*");
109
110 }
111 });
112
113 closeFuture.awaitUninterruptibly();
114 acceptor.dispose(true);
115
116 List threadsAfter = getThreadNames();
117
118 System.out.println("threadsBefore = " + threadsBefore);
119 System.out.println("threadsAfter = " + threadsAfter);
120
121
122
123 }
124
125 public static class ClientHandler extends IoHandlerAdapter {
126
127 private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
128
129 @Override
130 public void sessionCreated(IoSession session) throws Exception {
131 session.setAttribute("latch", new CountDownLatch(1));
132 }
133
134 @Override
135 public void messageReceived(IoSession session, Object message) throws Exception {
136 LOGGER.info("client: messageReceived(" + session + ", " + message + ")");
137 CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
138 latch.countDown();
139 }
140
141 @Override
142 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
143 LOGGER.warn("exceptionCaught:", cause);
144 }
145 }
146
147 public static class ServerHandler extends IoHandlerAdapter {
148
149 private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
150
151 @Override
152 public void messageReceived(IoSession session, Object message) throws Exception {
153 LOGGER.info("server: messageReceived(" + session + ", " + message + ")");
154 session.write(message.toString());
155 }
156
157 @Override
158 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
159 LOGGER.warn("exceptionCaught:", cause);
160 }
161
162 }
163
164 public static void main(String[] args) throws IOException, InterruptedException {
165 new AbstractIoServiceTest().testDispose();
166 }
167
168 private List<String> getThreadNames() {
169 List<String> list = new ArrayList<String>();
170 int active = Thread.activeCount();
171 Thread[] threads = new Thread[active];
172 Thread.enumerate(threads);
173 for (Thread thread : threads) {
174 try {
175 String name = thread.getName();
176 list.add(name);
177 } catch (NullPointerException ignore) {
178 }
179 }
180 return list;
181 }
182
183 }