1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23
24 import junit.framework.TestCase;
25
26 import org.apache.mina.common.ConnectFuture;
27 import org.apache.mina.common.IoAcceptor;
28 import org.apache.mina.common.IoBuffer;
29 import org.apache.mina.common.IoHandler;
30 import org.apache.mina.common.IoHandlerAdapter;
31 import org.apache.mina.common.IoSession;
32 import org.apache.mina.common.TransportMetadata;
33 import org.apache.mina.util.AvailablePortFinder;
34
35
36
37
38
39
40
41
42 public abstract class AbstractTrafficControlTest extends TestCase {
43 protected int port = 0;
44
45 protected IoAcceptor acceptor;
46
47 protected TransportMetadata transportType;
48
49 public AbstractTrafficControlTest(IoAcceptor acceptor) {
50 this.acceptor = acceptor;
51 }
52
53 @Override
54 protected void setUp() throws Exception {
55 super.setUp();
56
57 port = AvailablePortFinder.getNextAvailable();
58 acceptor.setHandler(new ServerIoHandler());
59 acceptor.bind(createServerSocketAddress(port));
60 }
61
62 @Override
63 protected void tearDown() throws Exception {
64 super.tearDown();
65
66 acceptor.dispose();
67 }
68
69 protected abstract ConnectFuture connect(int port, IoHandler handler)
70 throws Exception;
71
72 protected abstract SocketAddress createServerSocketAddress(int port);
73
74 public void testSuspendResumeReadWrite() throws Exception {
75 ConnectFuture future = connect(port, new ClientIoHandler());
76 future.awaitUninterruptibly();
77 IoSession session = future.getSession();
78
79
80
81 while (session.getAttribute("lock") == null) {
82 Thread.yield();
83 }
84
85 Object lock = session.getAttribute("lock");
86 synchronized (lock) {
87
88 write(session, "1");
89 assertEquals('1', read(session));
90 assertEquals("1", getReceived(session));
91 assertEquals("1", getSent(session));
92
93 session.suspendRead();
94
95 Thread.sleep(100);
96
97 write(session, "2");
98 assertFalse(canRead(session));
99 assertEquals("1", getReceived(session));
100 assertEquals("12", getSent(session));
101
102 session.suspendWrite();
103
104 Thread.sleep(100);
105
106 write(session, "3");
107 assertFalse(canRead(session));
108 assertEquals("1", getReceived(session));
109 assertEquals("12", getSent(session));
110
111 session.resumeRead();
112
113 Thread.sleep(100);
114
115 write(session, "4");
116 assertEquals('2', read(session));
117 assertEquals("12", getReceived(session));
118 assertEquals("12", getSent(session));
119
120 session.resumeWrite();
121
122 Thread.sleep(100);
123
124 assertEquals('3', read(session));
125 assertEquals('4', read(session));
126
127 write(session, "5");
128 assertEquals('5', read(session));
129 assertEquals("12345", getReceived(session));
130 assertEquals("12345", getSent(session));
131
132 session.suspendWrite();
133
134 Thread.sleep(100);
135
136 write(session, "6");
137 assertFalse(canRead(session));
138 assertEquals("12345", getReceived(session));
139 assertEquals("12345", getSent(session));
140
141 session.suspendRead();
142 session.resumeWrite();
143
144 Thread.sleep(100);
145
146 write(session, "7");
147 assertFalse(canRead(session));
148 assertEquals("12345", getReceived(session));
149 assertEquals("1234567", getSent(session));
150
151 session.resumeRead();
152
153 Thread.sleep(100);
154
155 assertEquals('6', read(session));
156 assertEquals('7', read(session));
157
158 assertEquals("1234567", getReceived(session));
159 assertEquals("1234567", getSent(session));
160
161 }
162
163 session.close().awaitUninterruptibly();
164 }
165
166 private void write(IoSession session, String s) throws Exception {
167 session.write(IoBuffer.wrap(s.getBytes("ASCII")));
168 }
169
170 private int read(IoSession session) throws Exception {
171 int pos = ((Integer) session.getAttribute("pos")).intValue();
172 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
173 Object lock = session.getAttribute("lock");
174 lock.wait(200);
175 }
176 session.setAttribute("pos", new Integer(pos + 1));
177 String received = getReceived(session);
178 assertTrue(received.length() > pos);
179 return getReceived(session).charAt(pos);
180 }
181
182 private boolean canRead(IoSession session) throws Exception {
183 int pos = ((Integer) session.getAttribute("pos")).intValue();
184 Object lock = session.getAttribute("lock");
185 lock.wait(250);
186 String received = getReceived(session);
187 return pos < received.length();
188 }
189
190 private String getReceived(IoSession session) throws Exception {
191 return session.getAttribute("received").toString();
192 }
193
194 private String getSent(IoSession session) throws Exception {
195 return session.getAttribute("sent").toString();
196 }
197
198 public static class ClientIoHandler extends IoHandlerAdapter {
199 @Override
200 public void sessionCreated(IoSession session) throws Exception {
201 super.sessionCreated(session);
202 session.setAttribute("pos", new Integer(0));
203 session.setAttribute("received", new StringBuffer());
204 session.setAttribute("sent", new StringBuffer());
205 session.setAttribute("lock", new Object());
206 }
207
208 @Override
209 public void messageReceived(IoSession session, Object message)
210 throws Exception {
211 IoBuffer buffer = (IoBuffer) message;
212 byte[] data = new byte[buffer.remaining()];
213 buffer.get(data);
214 Object lock = session.getAttribute("lock");
215 synchronized (lock) {
216 StringBuffer sb = (StringBuffer) session
217 .getAttribute("received");
218 sb.append(new String(data, "ASCII"));
219 lock.notifyAll();
220 }
221 }
222
223 @Override
224 public void messageSent(IoSession session, Object message)
225 throws Exception {
226 IoBuffer buffer = (IoBuffer) message;
227 buffer.rewind();
228 byte[] data = new byte[buffer.remaining()];
229 buffer.get(data);
230 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
231 sb.append(new String(data, "ASCII"));
232 }
233
234 }
235
236 private static class ServerIoHandler extends IoHandlerAdapter {
237 @Override
238 public void messageReceived(IoSession session, Object message)
239 throws Exception {
240
241 IoBuffer rb = (IoBuffer) message;
242 IoBuffer wb = IoBuffer.allocate(rb.remaining());
243 wb.put(rb);
244 wb.flip();
245 session.write(wb);
246 }
247 }
248 }