1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.example.echoserver;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.net.InetSocketAddress;
27
28 import org.apache.mina.core.RuntimeIoException;
29 import org.apache.mina.core.buffer.IoBuffer;
30 import org.apache.mina.core.future.ConnectFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoHandlerAdapter;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteException;
36 import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
37 import org.apache.mina.filter.ssl.SslFilter;
38 import org.apache.mina.transport.socket.nio.NioDatagramConnector;
39 import org.apache.mina.transport.socket.nio.NioSocketConnector;
40 import org.apache.mina.util.AvailablePortFinder;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
47
48
49
50
51 public class ConnectorTest extends AbstractTest {
52 private final static Logger LOGGER = LoggerFactory.getLogger(ConnectorTest.class);
53
54 private static final int TIMEOUT = 10000;
55
56 private final int COUNT = 10;
57
58 private final int DATA_SIZE = 16;
59
60 private EchoConnectorHandler handler;
61 private SslFilter connectorSSLFilter;
62
63 public ConnectorTest() {
64
65 }
66
67 @Before
68 public void setUp() throws Exception {
69 super.setUp();
70 handler = new EchoConnectorHandler();
71 connectorSSLFilter = new SslFilter(BogusSslContextFactory
72 .getInstance(false));
73 connectorSSLFilter.setUseClientMode(true);
74 }
75
76 @Test
77 public void testTCP() throws Exception {
78 IoConnector connector = new NioSocketConnector();
79 testConnector(connector);
80 }
81
82 @Test
83 public void testTCPWithSSL() throws Exception {
84 useSSL = true;
85
86 IoConnector connector = new NioSocketConnector();
87
88
89 connector.getFilterChain().addLast("SSL", connectorSSLFilter);
90 testConnector(connector);
91 }
92
93 @Test
94 public void testUDP() throws Exception {
95 IoConnector connector = new NioDatagramConnector();
96 testConnector(connector);
97 }
98
99 private void testConnector(IoConnector connector) throws Exception {
100 connector.setHandler(handler);
101
102
103 testConnector(connector, false);
104
105
106 testConnector(connector, true);
107 }
108
109 private void testConnector(IoConnector connector, boolean useLocalAddress)
110 throws Exception {
111 IoSession session = null;
112 if (!useLocalAddress) {
113 ConnectFuture future = connector.connect(new InetSocketAddress(
114 "127.0.0.1", port));
115 future.awaitUninterruptibly();
116 session = future.getSession();
117 } else {
118 int clientPort = port;
119 for (int i = 0; i < 65536; i++) {
120 clientPort = AvailablePortFinder
121 .getNextAvailable(clientPort + 1);
122 try {
123 ConnectFuture future = connector.connect(
124 new InetSocketAddress("127.0.0.1", port),
125 new InetSocketAddress(clientPort));
126 future.awaitUninterruptibly();
127 session = future.getSession();
128 break;
129 } catch (RuntimeIoException e) {
130
131 }
132 }
133
134 if (session == null) {
135 fail("Failed to find out an appropriate local address.");
136 }
137 }
138
139
140 testConnector0(session);
141
142
143 if (useSSL) {
144 connectorSSLFilter.stopSsl(session).awaitUninterruptibly();
145
146 System.out
147 .println("-------------------------------------------------------------------------------");
148
149 testConnector0(session);
150
151 System.out
152 .println("-------------------------------------------------------------------------------");
153
154
155
156 handler.readBuf.clear();
157 IoBuffer buf = IoBuffer.allocate(1);
158 buf.put((byte) '.');
159 buf.flip();
160 session.write(buf).awaitUninterruptibly();
161
162
163 waitForResponse(handler, 1);
164
165 handler.readBuf.flip();
166 assertEquals(1, handler.readBuf.remaining());
167 assertEquals((byte) '.', handler.readBuf.get());
168
169
170 assertTrue(connectorSSLFilter.startSsl(session));
171 testConnector0(session);
172 }
173
174 session.close(true).awaitUninterruptibly();
175 }
176
177 private void testConnector0(IoSession session) throws InterruptedException {
178 EchoConnectorHandler handler = (EchoConnectorHandler) session
179 .getHandler();
180 IoBuffer readBuf = handler.readBuf;
181 readBuf.clear();
182 WriteFuture writeFuture = null;
183 for (int i = 0; i < COUNT; i++) {
184 IoBuffer buf = IoBuffer.allocate(DATA_SIZE);
185 buf.limit(DATA_SIZE);
186 fillWriteBuffer(buf, i);
187 buf.flip();
188
189 writeFuture = session.write(buf);
190
191 if (session.getService().getTransportMetadata().isConnectionless()) {
192
193 waitForResponse(handler, (i + 1) * DATA_SIZE);
194 }
195 }
196
197 writeFuture.awaitUninterruptibly();
198
199 waitForResponse(handler, DATA_SIZE * COUNT);
200
201
202
203
204
205 readBuf.flip();
206 LOGGER.info("readBuf: " + readBuf);
207 assertEquals(DATA_SIZE * COUNT, readBuf.remaining());
208 IoBuffer expectedBuf = IoBuffer.allocate(DATA_SIZE * COUNT);
209
210 for (int i = 0; i < COUNT; i++) {
211 expectedBuf.limit((i + 1) * DATA_SIZE);
212 fillWriteBuffer(expectedBuf, i);
213 }
214
215 expectedBuf.position(0);
216
217 isEquals(expectedBuf, readBuf);
218 }
219
220 private void waitForResponse(EchoConnectorHandler handler, int bytes)
221 throws InterruptedException {
222 for (int j = 0; j < TIMEOUT / 10; j++) {
223 if (handler.readBuf.position() >= bytes) {
224 break;
225 }
226 Thread.sleep(10);
227 }
228
229 assertEquals(bytes, handler.readBuf.position());
230 }
231
232 private void fillWriteBuffer(IoBuffer writeBuf, int i) {
233 while (writeBuf.remaining() > 0) {
234 writeBuf.put((byte) i++);
235 }
236 }
237
238 private static class EchoConnectorHandler extends IoHandlerAdapter {
239 private final IoBuffer readBuf = IoBuffer.allocate(1024);
240
241 private EchoConnectorHandler() {
242 readBuf.setAutoExpand(true);
243 }
244
245 @Override
246 public void messageReceived(IoSession session, Object message) {
247 readBuf.put((IoBuffer) message);
248 }
249
250 @Override
251 public void messageSent(IoSession session, Object message) {
252 }
253
254 @Override
255 public void exceptionCaught(IoSession session, Throwable cause) {
256 LOGGER.warn("Unexpected exception.", cause);
257 if (cause instanceof WriteException) {
258 WriteException e = (WriteException) cause;
259 LOGGER.warn("Failed write requests: {}", e.getRequests());
260 }
261 }
262 }
263 }