001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.example.echoserver; 021 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.net.InetSocketAddress; 027 028import org.apache.mina.core.RuntimeIoException; 029import org.apache.mina.core.buffer.IoBuffer; 030import org.apache.mina.core.future.ConnectFuture; 031import org.apache.mina.core.future.WriteFuture; 032import org.apache.mina.core.service.IoConnector; 033import org.apache.mina.core.service.IoHandlerAdapter; 034import org.apache.mina.core.session.IoSession; 035import org.apache.mina.core.write.WriteException; 036import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory; 037import org.apache.mina.filter.ssl.SslFilter; 038import org.apache.mina.transport.socket.nio.NioDatagramConnector; 039import org.apache.mina.transport.socket.nio.NioSocketConnector; 040import org.apache.mina.util.AvailablePortFinder; 041import org.junit.Before; 042import org.junit.Test; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Tests echo server example. 048 * 049 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 050 */ 051public class ConnectorTest extends AbstractTest { 052 private final static Logger LOGGER = LoggerFactory.getLogger(ConnectorTest.class); 053 054 private static final int TIMEOUT = 10000; // 10 seconds 055 056 private final int COUNT = 10; 057 058 private final int DATA_SIZE = 16; 059 060 private EchoConnectorHandler handler; 061 private SslFilter connectorSSLFilter; 062 063 public ConnectorTest() { 064 // Do nothing 065 } 066 067 @Before 068 public void setUp() throws Exception { 069 super.setUp(); 070 handler = new EchoConnectorHandler(); 071 connectorSSLFilter = new SslFilter(BogusSslContextFactory 072 .getInstance(false)); 073 connectorSSLFilter.setUseClientMode(true); // set client mode 074 } 075 076 @Test 077 public void testTCP() throws Exception { 078 IoConnector connector = new NioSocketConnector(); 079 testConnector(connector); 080 } 081 082 @Test 083 public void testTCPWithSSL() throws Exception { 084 useSSL = true; 085 // Create a connector 086 IoConnector connector = new NioSocketConnector(); 087 088 // Add an SSL filter to connector 089 connector.getFilterChain().addLast("SSL", connectorSSLFilter); 090 testConnector(connector); 091 } 092 093 @Test 094 public void testUDP() throws Exception { 095 IoConnector connector = new NioDatagramConnector(); 096 testConnector(connector); 097 } 098 099 private void testConnector(IoConnector connector) throws Exception { 100 connector.setHandler(handler); 101 102 //System.out.println("* Without localAddress"); 103 testConnector(connector, false); 104 105 //System.out.println("* With localAddress"); 106 testConnector(connector, true); 107 } 108 109 private void testConnector(IoConnector connector, boolean useLocalAddress) 110 throws Exception { 111 IoSession session = null; 112 if (!useLocalAddress) { 113 ConnectFuture future = connector.connect(new InetSocketAddress( 114 "127.0.0.1", port)); 115 future.awaitUninterruptibly(); 116 session = future.getSession(); 117 } else { 118 int clientPort = port; 119 for (int i = 0; i < 65536; i++) { 120 clientPort = AvailablePortFinder 121 .getNextAvailable(clientPort + 1); 122 try { 123 ConnectFuture future = connector.connect( 124 new InetSocketAddress("127.0.0.1", port), 125 new InetSocketAddress(clientPort)); 126 future.awaitUninterruptibly(); 127 session = future.getSession(); 128 break; 129 } catch (RuntimeIoException e) { 130 // Try again until we succeed to bind. 131 } 132 } 133 134 if (session == null) { 135 fail("Failed to find out an appropriate local address."); 136 } 137 } 138 139 // Run a basic connector test. 140 testConnector0(session); 141 142 // Send closeNotify to test TLS closure if it is TLS connection. 143 if (useSSL) { 144 connectorSSLFilter.stopSsl(session).awaitUninterruptibly(); 145 146 System.out 147 .println("-------------------------------------------------------------------------------"); 148 // Test again after we finished TLS session. 149 testConnector0(session); 150 151 System.out 152 .println("-------------------------------------------------------------------------------"); 153 154 // Test if we can enter TLS mode again. 155 //// Send StartTLS request. 156 handler.readBuf.clear(); 157 IoBuffer buf = IoBuffer.allocate(1); 158 buf.put((byte) '.'); 159 buf.flip(); 160 session.write(buf).awaitUninterruptibly(); 161 162 //// Wait for StartTLS response. 163 waitForResponse(handler, 1); 164 165 handler.readBuf.flip(); 166 assertEquals(1, handler.readBuf.remaining()); 167 assertEquals((byte) '.', handler.readBuf.get()); 168 169 // Now start TLS connection 170 assertTrue(connectorSSLFilter.startSsl(session)); 171 testConnector0(session); 172 } 173 174 session.close(true).awaitUninterruptibly(); 175 } 176 177 private void testConnector0(IoSession session) throws InterruptedException { 178 EchoConnectorHandler handler = (EchoConnectorHandler) session 179 .getHandler(); 180 IoBuffer readBuf = handler.readBuf; 181 readBuf.clear(); 182 WriteFuture writeFuture = null; 183 for (int i = 0; i < COUNT; i++) { 184 IoBuffer buf = IoBuffer.allocate(DATA_SIZE); 185 buf.limit(DATA_SIZE); 186 fillWriteBuffer(buf, i); 187 buf.flip(); 188 189 writeFuture = session.write(buf); 190 191 if (session.getService().getTransportMetadata().isConnectionless()) { 192 // This will align message arrival order in connectionless transport types 193 waitForResponse(handler, (i + 1) * DATA_SIZE); 194 } 195 } 196 197 writeFuture.awaitUninterruptibly(); 198 199 waitForResponse(handler, DATA_SIZE * COUNT); 200 201 // Assert data 202 //// Please note that BufferOverflowException can be thrown 203 //// in SocketIoProcessor if there was a read timeout because 204 //// we share readBuf. 205 readBuf.flip(); 206 LOGGER.info("readBuf: " + readBuf); 207 assertEquals(DATA_SIZE * COUNT, readBuf.remaining()); 208 IoBuffer expectedBuf = IoBuffer.allocate(DATA_SIZE * COUNT); 209 210 for (int i = 0; i < COUNT; i++) { 211 expectedBuf.limit((i + 1) * DATA_SIZE); 212 fillWriteBuffer(expectedBuf, i); 213 } 214 215 expectedBuf.position(0); 216 217 isEquals(expectedBuf, readBuf); 218 } 219 220 private void waitForResponse(EchoConnectorHandler handler, int bytes) 221 throws InterruptedException { 222 for (int j = 0; j < TIMEOUT / 10; j++) { 223 if (handler.readBuf.position() >= bytes) { 224 break; 225 } 226 Thread.sleep(10); 227 } 228 229 assertEquals(bytes, handler.readBuf.position()); 230 } 231 232 private void fillWriteBuffer(IoBuffer writeBuf, int i) { 233 while (writeBuf.remaining() > 0) { 234 writeBuf.put((byte) i++); 235 } 236 } 237 238 private static class EchoConnectorHandler extends IoHandlerAdapter { 239 private final IoBuffer readBuf = IoBuffer.allocate(1024); 240 241 private EchoConnectorHandler() { 242 readBuf.setAutoExpand(true); 243 } 244 245 @Override 246 public void messageReceived(IoSession session, Object message) { 247 readBuf.put((IoBuffer) message); 248 } 249 250 @Override 251 public void messageSent(IoSession session, Object message) { 252 } 253 254 @Override 255 public void exceptionCaught(IoSession session, Throwable cause) { 256 LOGGER.warn("Unexpected exception.", cause); 257 if (cause instanceof WriteException) { 258 WriteException e = (WriteException) cause; 259 LOGGER.warn("Failed write requests: {}", e.getRequests()); 260 } 261 } 262 } 263}