1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.example.echoserver;
21
22 import java.net.InetSocketAddress;
23
24 import junit.framework.Assert;
25
26 import org.apache.mina.core.RuntimeIoException;
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.future.ConnectFuture;
29 import org.apache.mina.core.future.WriteFuture;
30 import org.apache.mina.core.service.IoConnector;
31 import org.apache.mina.core.service.IoHandlerAdapter;
32 import org.apache.mina.core.session.IoSession;
33 import org.apache.mina.core.write.WriteException;
34 import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
35 import org.apache.mina.filter.ssl.SslFilter;
36 import org.apache.mina.transport.socket.nio.NioDatagramConnector;
37 import org.apache.mina.transport.socket.nio.NioSocketConnector;
38 import org.apache.mina.util.AvailablePortFinder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47
48 public class ConnectorTest extends AbstractTest {
49 private final static Logger logger = LoggerFactory.getLogger(ConnectorTest.class);
50
51 private static final int TIMEOUT = 10000;
52
53 private final int COUNT = 10;
54
55 private final int DATA_SIZE = 16;
56
57 private EchoConnectorHandler handler;
58 private SslFilter connectorSSLFilter;
59
60 public ConnectorTest() {
61 }
62
63 @Override
64 protected void setUp() throws Exception {
65 super.setUp();
66 handler = new EchoConnectorHandler();
67 connectorSSLFilter = new SslFilter(BogusSslContextFactory
68 .getInstance(false));
69 connectorSSLFilter.setUseClientMode(true);
70 }
71
72 public void testTCP() throws Exception {
73 IoConnector connector = new NioSocketConnector();
74 testConnector(connector);
75 }
76
77 public void testTCPWithSSL() throws Exception {
78 useSSL = true;
79
80 IoConnector connector = new NioSocketConnector();
81
82
83 connector.getFilterChain().addLast("SSL", connectorSSLFilter);
84 testConnector(connector);
85 }
86
87 public void testUDP() throws Exception {
88 IoConnector connector = new NioDatagramConnector();
89 testConnector(connector);
90 }
91
92 private void testConnector(IoConnector connector) throws Exception {
93 connector.setHandler(handler);
94
95
96 testConnector(connector, false);
97
98
99 testConnector(connector, true);
100 }
101
102 private void testConnector(IoConnector connector, boolean useLocalAddress)
103 throws Exception {
104 IoSession session = null;
105 if (!useLocalAddress) {
106 ConnectFuture future = connector.connect(new InetSocketAddress(
107 "127.0.0.1", port));
108 future.awaitUninterruptibly();
109 session = future.getSession();
110 } else {
111 int clientPort = port;
112 for (int i = 0; i < 65536; i++) {
113 clientPort = AvailablePortFinder
114 .getNextAvailable(clientPort + 1);
115 try {
116 ConnectFuture future = connector.connect(
117 new InetSocketAddress("127.0.0.1", port),
118 new InetSocketAddress(clientPort));
119 future.awaitUninterruptibly();
120 session = future.getSession();
121 break;
122 } catch (RuntimeIoException e) {
123
124 }
125 }
126
127 if (session == null) {
128 Assert.fail("Failed to find out an appropriate local address.");
129 }
130 }
131
132
133 testConnector0(session);
134
135
136 if (useSSL) {
137 connectorSSLFilter.stopSsl(session).awaitUninterruptibly();
138
139 System.out
140 .println("-------------------------------------------------------------------------------");
141
142 testConnector0(session);
143
144 System.out
145 .println("-------------------------------------------------------------------------------");
146
147
148
149 handler.readBuf.clear();
150 IoBuffer buf = IoBuffer.allocate(1);
151 buf.put((byte) '.');
152 buf.flip();
153 session.write(buf).awaitUninterruptibly();
154
155
156 waitForResponse(handler, 1);
157
158 handler.readBuf.flip();
159 Assert.assertEquals(1, handler.readBuf.remaining());
160 Assert.assertEquals((byte) '.', handler.readBuf.get());
161
162
163 Assert.assertTrue(connectorSSLFilter.startSsl(session));
164 testConnector0(session);
165 }
166
167 session.close(true).awaitUninterruptibly();
168 }
169
170 private void testConnector0(IoSession session) throws InterruptedException {
171 EchoConnectorHandler handler = (EchoConnectorHandler) session
172 .getHandler();
173 IoBuffer readBuf = handler.readBuf;
174 readBuf.clear();
175 WriteFuture writeFuture = null;
176 for (int i = 0; i < COUNT; i++) {
177 IoBuffer buf = IoBuffer.allocate(DATA_SIZE);
178 buf.limit(DATA_SIZE);
179 fillWriteBuffer(buf, i);
180 buf.flip();
181
182 writeFuture = session.write(buf);
183
184 if (session.getService().getTransportMetadata().isConnectionless()) {
185
186 waitForResponse(handler, (i + 1) * DATA_SIZE);
187 }
188 }
189
190 writeFuture.awaitUninterruptibly();
191
192 waitForResponse(handler, DATA_SIZE * COUNT);
193
194
195
196
197
198 readBuf.flip();
199 logger.info("readBuf: " + readBuf);
200 Assert.assertEquals(DATA_SIZE * COUNT, readBuf.remaining());
201 IoBuffer expectedBuf = IoBuffer.allocate(DATA_SIZE * COUNT);
202 for (int i = 0; i < COUNT; i++) {
203 expectedBuf.limit((i + 1) * DATA_SIZE);
204 fillWriteBuffer(expectedBuf, i);
205 }
206 expectedBuf.position(0);
207
208 assertEquals(expectedBuf, readBuf);
209 }
210
211 private void waitForResponse(EchoConnectorHandler handler, int bytes)
212 throws InterruptedException {
213 for (int j = 0; j < TIMEOUT / 10; j++) {
214 if (handler.readBuf.position() >= bytes) {
215 break;
216 }
217 Thread.sleep(10);
218 }
219
220 Assert.assertEquals(bytes, handler.readBuf.position());
221 }
222
223 private void fillWriteBuffer(IoBuffer writeBuf, int i) {
224 while (writeBuf.remaining() > 0) {
225 writeBuf.put((byte) i++);
226 }
227 }
228
229 public static void main(String[] args) {
230 junit.textui.TestRunner.run(ConnectorTest.class);
231 }
232
233 private static class EchoConnectorHandler extends IoHandlerAdapter {
234 private final IoBuffer readBuf = IoBuffer.allocate(1024);
235
236 private EchoConnectorHandler() {
237 readBuf.setAutoExpand(true);
238 }
239
240 @Override
241 public void messageReceived(IoSession session, Object message) {
242 readBuf.put((IoBuffer) message);
243 }
244
245 @Override
246 public void messageSent(IoSession session, Object message) {
247 }
248
249 @Override
250 public void exceptionCaught(IoSession session, Throwable cause) {
251 logger.warn("Unexpected exception.", cause);
252 if (cause instanceof WriteException) {
253 WriteException e = (WriteException) cause;
254 logger.warn("Failed write requests: {}", e.getRequests());
255 }
256 }
257 }
258 }