1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23
24 import org.apache.mina.core.buffer.IoBuffer;
25 import org.apache.mina.core.future.ConnectFuture;
26 import org.apache.mina.core.service.IoAcceptor;
27 import org.apache.mina.core.service.IoHandler;
28 import org.apache.mina.core.service.IoHandlerAdapter;
29 import org.apache.mina.core.service.TransportMetadata;
30 import org.apache.mina.core.session.IoSession;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertFalse;
37 import static org.junit.Assert.assertTrue;
38
39
40
41
42
43
44
45 public abstract class AbstractTrafficControlTest {
46
47 protected int port;
48 protected IoAcceptor acceptor;
49 protected TransportMetadata transportType;
50
51 public AbstractTrafficControlTest(IoAcceptor acceptor) {
52 this.acceptor = acceptor;
53 }
54
55 @Before
56 public void setUp() throws Exception {
57 acceptor.setHandler(new ServerIoHandler());
58 acceptor.bind(createServerSocketAddress(0));
59 port = getPort(acceptor.getLocalAddress());
60 }
61
62 @After
63 public void tearDown() throws Exception {
64 acceptor.unbind();
65 acceptor.dispose();
66 }
67
68 protected abstract ConnectFuture connect(int port, IoHandler handler)
69 throws Exception;
70
71 protected abstract SocketAddress createServerSocketAddress(int port);
72 protected abstract int getPort(SocketAddress address);
73
74 @Test
75 public void testSuspendResumeReadWrite() throws Exception {
76 ConnectFuture future = connect(port, new ClientIoHandler());
77 future.awaitUninterruptibly();
78 IoSession session = future.getSession();
79
80
81
82 while (session.getAttribute("lock") == null) {
83 Thread.yield();
84 }
85
86 Object lock = session.getAttribute("lock");
87 synchronized (lock) {
88
89 write(session, "1");
90 assertEquals('1', read(session));
91 assertEquals("1", getReceived(session));
92 assertEquals("1", getSent(session));
93
94 session.suspendRead();
95
96 Thread.sleep(100);
97
98 write(session, "2");
99 assertFalse(canRead(session));
100 assertEquals("1", getReceived(session));
101 assertEquals("12", getSent(session));
102
103 session.suspendWrite();
104
105 Thread.sleep(100);
106
107 write(session, "3");
108 assertFalse(canRead(session));
109 assertEquals("1", getReceived(session));
110 assertEquals("12", getSent(session));
111
112 session.resumeRead();
113
114 Thread.sleep(100);
115
116 write(session, "4");
117 assertEquals('2', read(session));
118 assertEquals("12", getReceived(session));
119 assertEquals("12", getSent(session));
120
121 session.resumeWrite();
122
123 Thread.sleep(100);
124
125 assertEquals('3', read(session));
126 assertEquals('4', read(session));
127
128 write(session, "5");
129 assertEquals('5', read(session));
130 assertEquals("12345", getReceived(session));
131 assertEquals("12345", getSent(session));
132
133 session.suspendWrite();
134
135 Thread.sleep(100);
136
137 write(session, "6");
138 assertFalse(canRead(session));
139 assertEquals("12345", getReceived(session));
140 assertEquals("12345", getSent(session));
141
142 session.suspendRead();
143 session.resumeWrite();
144
145 Thread.sleep(100);
146
147 write(session, "7");
148 assertFalse(canRead(session));
149 assertEquals("12345", getReceived(session));
150 assertEquals("1234567", getSent(session));
151
152 session.resumeRead();
153
154 Thread.sleep(100);
155
156 assertEquals('6', read(session));
157 assertEquals('7', read(session));
158
159 assertEquals("1234567", getReceived(session));
160 assertEquals("1234567", getSent(session));
161
162 }
163
164 session.close(true).awaitUninterruptibly();
165 }
166
167 private void write(IoSession session, String s) throws Exception {
168 session.write(IoBuffer.wrap(s.getBytes("ASCII")));
169 }
170
171 private int read(IoSession session) throws Exception {
172 int pos = ((Integer) session.getAttribute("pos")).intValue();
173 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
174 Object lock = session.getAttribute("lock");
175 lock.wait(200);
176 }
177 session.setAttribute("pos", new Integer(pos + 1));
178 String received = getReceived(session);
179 assertTrue(received.length() > pos);
180 return getReceived(session).charAt(pos);
181 }
182
183 private boolean canRead(IoSession session) throws Exception {
184 int pos = ((Integer) session.getAttribute("pos")).intValue();
185 Object lock = session.getAttribute("lock");
186 lock.wait(250);
187 String received = getReceived(session);
188 return pos < received.length();
189 }
190
191 private String getReceived(IoSession session) throws Exception {
192 return session.getAttribute("received").toString();
193 }
194
195 private String getSent(IoSession session) throws Exception {
196 return session.getAttribute("sent").toString();
197 }
198
199 private static class ClientIoHandler extends IoHandlerAdapter {
200
201
202
203 public ClientIoHandler() {
204 super();
205 }
206
207 @Override
208 public void sessionCreated(IoSession session) throws Exception {
209 super.sessionCreated(session);
210 session.setAttribute("pos", new Integer(0));
211 session.setAttribute("received", new StringBuffer());
212 session.setAttribute("sent", new StringBuffer());
213 session.setAttribute("lock", new Object());
214 }
215
216 @Override
217 public void messageReceived(IoSession session, Object message)
218 throws Exception {
219 IoBuffer buffer = (IoBuffer) message;
220 byte[] data = new byte[buffer.remaining()];
221 buffer.get(data);
222 Object lock = session.getAttribute("lock");
223 synchronized (lock) {
224 StringBuffer sb = (StringBuffer) session
225 .getAttribute("received");
226 sb.append(new String(data, "ASCII"));
227 lock.notifyAll();
228 }
229 }
230
231 @Override
232 public void messageSent(IoSession session, Object message)
233 throws Exception {
234 IoBuffer buffer = (IoBuffer) message;
235 buffer.rewind();
236 byte[] data = new byte[buffer.remaining()];
237 buffer.get(data);
238 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
239 sb.append(new String(data, "ASCII"));
240 }
241
242 }
243
244 private static class ServerIoHandler extends IoHandlerAdapter {
245
246
247
248 public ServerIoHandler() {
249 super();
250 }
251
252 @Override
253 public void messageReceived(IoSession session, Object message)
254 throws Exception {
255
256 IoBuffer rb = (IoBuffer) message;
257 IoBuffer wb = IoBuffer.allocate(rb.remaining());
258 wb.put(rb);
259 wb.flip();
260 session.write(wb);
261 }
262 }
263 }