001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport; 021 022import java.net.SocketAddress; 023 024import org.apache.mina.core.buffer.IoBuffer; 025import org.apache.mina.core.future.ConnectFuture; 026import org.apache.mina.core.service.IoAcceptor; 027import org.apache.mina.core.service.IoHandler; 028import org.apache.mina.core.service.IoHandlerAdapter; 029import org.apache.mina.core.service.TransportMetadata; 030import org.apache.mina.core.session.IoSession; 031import org.junit.After; 032import org.junit.Before; 033import org.junit.Test; 034 035import static org.junit.Assert.assertEquals; 036import static org.junit.Assert.assertFalse; 037import static org.junit.Assert.assertTrue; 038 039/** 040 * Abstract base class for testing suspending and resuming reads and 041 * writes. 042 * 043 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 044 */ 045public abstract class AbstractTrafficControlTest { 046 047 protected int port; 048 049 protected IoAcceptor acceptor; 050 051 protected TransportMetadata transportType; 052 053 public AbstractTrafficControlTest(IoAcceptor acceptor) { 054 this.acceptor = acceptor; 055 } 056 057 @Before 058 public void setUp() throws Exception { 059 acceptor.setHandler(new ServerIoHandler()); 060 acceptor.bind(createServerSocketAddress(0)); 061 port = getPort(acceptor.getLocalAddress()); 062 } 063 064 @After 065 public void tearDown() throws Exception { 066 acceptor.unbind(); 067 acceptor.dispose(); 068 } 069 070 protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception; 071 072 protected abstract SocketAddress createServerSocketAddress(int port); 073 074 protected abstract int getPort(SocketAddress address); 075 076 @Test 077 public void testSuspendResumeReadWrite() throws Exception { 078 ConnectFuture future = connect(port, new ClientIoHandler()); 079 future.awaitUninterruptibly(); 080 IoSession session = future.getSession(); 081 082 // We wait for the sessionCreated() event is fired because we 083 // cannot guarantee that it is invoked already. 084 while (session.getAttribute("lock") == null) { 085 Thread.yield(); 086 } 087 088 Object lock = session.getAttribute("lock"); 089 synchronized (lock) { 090 091 write(session, "1"); 092 assertEquals('1', read(session)); 093 assertEquals("1", getReceived(session)); 094 assertEquals("1", getSent(session)); 095 096 session.suspendRead(); 097 098 Thread.sleep(100); 099 100 write(session, "2"); 101 assertFalse(canRead(session)); 102 assertEquals("1", getReceived(session)); 103 assertEquals("12", getSent(session)); 104 105 session.suspendWrite(); 106 107 Thread.sleep(100); 108 109 write(session, "3"); 110 assertFalse(canRead(session)); 111 assertEquals("1", getReceived(session)); 112 assertEquals("12", getSent(session)); 113 114 session.resumeRead(); 115 116 Thread.sleep(100); 117 118 write(session, "4"); 119 assertEquals('2', read(session)); 120 assertEquals("12", getReceived(session)); 121 assertEquals("12", getSent(session)); 122 123 session.resumeWrite(); 124 125 Thread.sleep(100); 126 127 assertEquals('3', read(session)); 128 assertEquals('4', read(session)); 129 130 write(session, "5"); 131 assertEquals('5', read(session)); 132 assertEquals("12345", getReceived(session)); 133 assertEquals("12345", getSent(session)); 134 135 session.suspendWrite(); 136 137 Thread.sleep(100); 138 139 write(session, "6"); 140 assertFalse(canRead(session)); 141 assertEquals("12345", getReceived(session)); 142 assertEquals("12345", getSent(session)); 143 144 session.suspendRead(); 145 session.resumeWrite(); 146 147 Thread.sleep(100); 148 149 write(session, "7"); 150 assertFalse(canRead(session)); 151 assertEquals("12345", getReceived(session)); 152 assertEquals("1234567", getSent(session)); 153 154 session.resumeRead(); 155 156 Thread.sleep(100); 157 158 assertEquals('6', read(session)); 159 assertEquals('7', read(session)); 160 161 assertEquals("1234567", getReceived(session)); 162 assertEquals("1234567", getSent(session)); 163 164 } 165 166 session.close(true).awaitUninterruptibly(); 167 } 168 169 private void write(IoSession session, String s) throws Exception { 170 session.write(IoBuffer.wrap(s.getBytes("ASCII"))); 171 } 172 173 private int read(IoSession session) throws Exception { 174 int pos = ((Integer) session.getAttribute("pos")).intValue(); 175 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) { 176 Object lock = session.getAttribute("lock"); 177 lock.wait(200); 178 } 179 session.setAttribute("pos", new Integer(pos + 1)); 180 String received = getReceived(session); 181 assertTrue(received.length() > pos); 182 return getReceived(session).charAt(pos); 183 } 184 185 private boolean canRead(IoSession session) throws Exception { 186 int pos = ((Integer) session.getAttribute("pos")).intValue(); 187 Object lock = session.getAttribute("lock"); 188 lock.wait(250); 189 String received = getReceived(session); 190 return pos < received.length(); 191 } 192 193 private String getReceived(IoSession session) throws Exception { 194 return session.getAttribute("received").toString(); 195 } 196 197 private String getSent(IoSession session) throws Exception { 198 return session.getAttribute("sent").toString(); 199 } 200 201 private static class ClientIoHandler extends IoHandlerAdapter { 202 /** 203 * Default constructor 204 */ 205 public ClientIoHandler() { 206 super(); 207 } 208 209 @Override 210 public void sessionCreated(IoSession session) throws Exception { 211 super.sessionCreated(session); 212 session.setAttribute("pos", new Integer(0)); 213 session.setAttribute("received", new StringBuffer()); 214 session.setAttribute("sent", new StringBuffer()); 215 session.setAttribute("lock", new Object()); 216 } 217 218 @Override 219 public void messageReceived(IoSession session, Object message) throws Exception { 220 IoBuffer buffer = (IoBuffer) message; 221 byte[] data = new byte[buffer.remaining()]; 222 buffer.get(data); 223 Object lock = session.getAttribute("lock"); 224 synchronized (lock) { 225 StringBuffer sb = (StringBuffer) session.getAttribute("received"); 226 sb.append(new String(data, "ASCII")); 227 lock.notifyAll(); 228 } 229 } 230 231 @Override 232 public void messageSent(IoSession session, Object message) throws Exception { 233 IoBuffer buffer = (IoBuffer) message; 234 buffer.rewind(); 235 byte[] data = new byte[buffer.remaining()]; 236 buffer.get(data); 237 StringBuffer sb = (StringBuffer) session.getAttribute("sent"); 238 sb.append(new String(data, "ASCII")); 239 } 240 241 } 242 243 private static class ServerIoHandler extends IoHandlerAdapter { 244 /** 245 * Default constructor 246 */ 247 public ServerIoHandler() { 248 super(); 249 } 250 251 @Override 252 public void messageReceived(IoSession session, Object message) throws Exception { 253 // Just echo the received bytes. 254 IoBuffer rb = (IoBuffer) message; 255 IoBuffer wb = IoBuffer.allocate(rb.remaining()); 256 wb.put(rb); 257 wb.flip(); 258 session.write(wb); 259 } 260 } 261}