1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23
24 import org.apache.mina.core.buffer.IoBuffer;
25 import org.apache.mina.core.future.ConnectFuture;
26 import org.apache.mina.core.service.IoAcceptor;
27 import org.apache.mina.core.service.IoHandler;
28 import org.apache.mina.core.service.IoHandlerAdapter;
29 import org.apache.mina.core.service.TransportMetadata;
30 import org.apache.mina.core.session.IoSession;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertFalse;
37 import static org.junit.Assert.assertTrue;
38
39
40
41
42
43
44
45
46 public abstract class AbstractTrafficControlTest {
47
48 protected int port;
49 protected IoAcceptor acceptor;
50 protected TransportMetadata transportType;
51
52 public AbstractTrafficControlTest(IoAcceptor acceptor) {
53 this.acceptor = acceptor;
54 }
55
56 @Before
57 public void setUp() throws Exception {
58 acceptor.setHandler(new ServerIoHandler());
59 acceptor.bind(createServerSocketAddress(0));
60 port = getPort(acceptor.getLocalAddress());
61 }
62
63 @After
64 public void tearDown() throws Exception {
65 acceptor.unbind();
66 acceptor.dispose();
67 }
68
69 protected abstract ConnectFuture connect(int port, IoHandler handler)
70 throws Exception;
71
72 protected abstract SocketAddress createServerSocketAddress(int port);
73 protected abstract int getPort(SocketAddress address);
74
75 @Test
76 public void testSuspendResumeReadWrite() throws Exception {
77 ConnectFuture future = connect(port, new ClientIoHandler());
78 future.awaitUninterruptibly();
79 IoSession session = future.getSession();
80
81
82
83 while (session.getAttribute("lock") == null) {
84 Thread.yield();
85 }
86
87 Object lock = session.getAttribute("lock");
88 synchronized (lock) {
89
90 write(session, "1");
91 assertEquals('1', read(session));
92 assertEquals("1", getReceived(session));
93 assertEquals("1", getSent(session));
94
95 session.suspendRead();
96
97 Thread.sleep(100);
98
99 write(session, "2");
100 assertFalse(canRead(session));
101 assertEquals("1", getReceived(session));
102 assertEquals("12", getSent(session));
103
104 session.suspendWrite();
105
106 Thread.sleep(100);
107
108 write(session, "3");
109 assertFalse(canRead(session));
110 assertEquals("1", getReceived(session));
111 assertEquals("12", getSent(session));
112
113 session.resumeRead();
114
115 Thread.sleep(100);
116
117 write(session, "4");
118 assertEquals('2', read(session));
119 assertEquals("12", getReceived(session));
120 assertEquals("12", getSent(session));
121
122 session.resumeWrite();
123
124 Thread.sleep(100);
125
126 assertEquals('3', read(session));
127 assertEquals('4', read(session));
128
129 write(session, "5");
130 assertEquals('5', read(session));
131 assertEquals("12345", getReceived(session));
132 assertEquals("12345", getSent(session));
133
134 session.suspendWrite();
135
136 Thread.sleep(100);
137
138 write(session, "6");
139 assertFalse(canRead(session));
140 assertEquals("12345", getReceived(session));
141 assertEquals("12345", getSent(session));
142
143 session.suspendRead();
144 session.resumeWrite();
145
146 Thread.sleep(100);
147
148 write(session, "7");
149 assertFalse(canRead(session));
150 assertEquals("12345", getReceived(session));
151 assertEquals("1234567", getSent(session));
152
153 session.resumeRead();
154
155 Thread.sleep(100);
156
157 assertEquals('6', read(session));
158 assertEquals('7', read(session));
159
160 assertEquals("1234567", getReceived(session));
161 assertEquals("1234567", getSent(session));
162
163 }
164
165 session.close(true).awaitUninterruptibly();
166 }
167
168 private void write(IoSession session, String s) throws Exception {
169 session.write(IoBuffer.wrap(s.getBytes("ASCII")));
170 }
171
172 private int read(IoSession session) throws Exception {
173 int pos = ((Integer) session.getAttribute("pos")).intValue();
174 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
175 Object lock = session.getAttribute("lock");
176 lock.wait(200);
177 }
178 session.setAttribute("pos", new Integer(pos + 1));
179 String received = getReceived(session);
180 assertTrue(received.length() > pos);
181 return getReceived(session).charAt(pos);
182 }
183
184 private boolean canRead(IoSession session) throws Exception {
185 int pos = ((Integer) session.getAttribute("pos")).intValue();
186 Object lock = session.getAttribute("lock");
187 lock.wait(250);
188 String received = getReceived(session);
189 return pos < received.length();
190 }
191
192 private String getReceived(IoSession session) throws Exception {
193 return session.getAttribute("received").toString();
194 }
195
196 private String getSent(IoSession session) throws Exception {
197 return session.getAttribute("sent").toString();
198 }
199
200 public static class ClientIoHandler extends IoHandlerAdapter {
201 @Override
202 public void sessionCreated(IoSession session) throws Exception {
203 super.sessionCreated(session);
204 session.setAttribute("pos", new Integer(0));
205 session.setAttribute("received", new StringBuffer());
206 session.setAttribute("sent", new StringBuffer());
207 session.setAttribute("lock", new Object());
208 }
209
210 @Override
211 public void messageReceived(IoSession session, Object message)
212 throws Exception {
213 IoBuffer buffer = (IoBuffer) message;
214 byte[] data = new byte[buffer.remaining()];
215 buffer.get(data);
216 Object lock = session.getAttribute("lock");
217 synchronized (lock) {
218 StringBuffer sb = (StringBuffer) session
219 .getAttribute("received");
220 sb.append(new String(data, "ASCII"));
221 lock.notifyAll();
222 }
223 }
224
225 @Override
226 public void messageSent(IoSession session, Object message)
227 throws Exception {
228 IoBuffer buffer = (IoBuffer) message;
229 buffer.rewind();
230 byte[] data = new byte[buffer.remaining()];
231 buffer.get(data);
232 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
233 sb.append(new String(data, "ASCII"));
234 }
235
236 }
237
238 private static class ServerIoHandler extends IoHandlerAdapter {
239 @Override
240 public void messageReceived(IoSession session, Object message)
241 throws Exception {
242
243 IoBuffer rb = (IoBuffer) message;
244 IoBuffer wb = IoBuffer.allocate(rb.remaining());
245 wb.put(rb);
246 wb.flip();
247 session.write(wb);
248 }
249 }
250 }