1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23
24 import junit.framework.TestCase;
25
26 import org.apache.mina.core.buffer.IoBuffer;
27 import org.apache.mina.core.future.ConnectFuture;
28 import org.apache.mina.core.service.IoAcceptor;
29 import org.apache.mina.core.service.IoHandler;
30 import org.apache.mina.core.service.IoHandlerAdapter;
31 import org.apache.mina.core.service.TransportMetadata;
32 import org.apache.mina.core.session.IoSession;
33
34
35
36
37
38
39
40
41 public abstract class AbstractTrafficControlTest extends TestCase {
42
43 protected int port;
44 protected IoAcceptor acceptor;
45 protected TransportMetadata transportType;
46
47 public AbstractTrafficControlTest(IoAcceptor acceptor) {
48 this.acceptor = acceptor;
49 }
50
51 @Override
52 protected void setUp() throws Exception {
53 super.setUp();
54
55 acceptor.setHandler(new ServerIoHandler());
56 acceptor.bind(createServerSocketAddress(0));
57 port = getPort(acceptor.getLocalAddress());
58 }
59
60 @Override
61 protected void tearDown() throws Exception {
62 super.tearDown();
63 acceptor.unbind();
64 acceptor.dispose();
65 }
66
67 protected abstract ConnectFuture connect(int port, IoHandler handler)
68 throws Exception;
69
70 protected abstract SocketAddress createServerSocketAddress(int port);
71 protected abstract int getPort(SocketAddress address);
72
73 public void testSuspendResumeReadWrite() throws Exception {
74 ConnectFuture future = connect(port, new ClientIoHandler());
75 future.awaitUninterruptibly();
76 IoSession session = future.getSession();
77
78
79
80 while (session.getAttribute("lock") == null) {
81 Thread.yield();
82 }
83
84 Object lock = session.getAttribute("lock");
85 synchronized (lock) {
86
87 write(session, "1");
88 assertEquals('1', read(session));
89 assertEquals("1", getReceived(session));
90 assertEquals("1", getSent(session));
91
92 session.suspendRead();
93
94 Thread.sleep(100);
95
96 write(session, "2");
97 assertFalse(canRead(session));
98 assertEquals("1", getReceived(session));
99 assertEquals("12", getSent(session));
100
101 session.suspendWrite();
102
103 Thread.sleep(100);
104
105 write(session, "3");
106 assertFalse(canRead(session));
107 assertEquals("1", getReceived(session));
108 assertEquals("12", getSent(session));
109
110 session.resumeRead();
111
112 Thread.sleep(100);
113
114 write(session, "4");
115 assertEquals('2', read(session));
116 assertEquals("12", getReceived(session));
117 assertEquals("12", getSent(session));
118
119 session.resumeWrite();
120
121 Thread.sleep(100);
122
123 assertEquals('3', read(session));
124 assertEquals('4', read(session));
125
126 write(session, "5");
127 assertEquals('5', read(session));
128 assertEquals("12345", getReceived(session));
129 assertEquals("12345", getSent(session));
130
131 session.suspendWrite();
132
133 Thread.sleep(100);
134
135 write(session, "6");
136 assertFalse(canRead(session));
137 assertEquals("12345", getReceived(session));
138 assertEquals("12345", getSent(session));
139
140 session.suspendRead();
141 session.resumeWrite();
142
143 Thread.sleep(100);
144
145 write(session, "7");
146 assertFalse(canRead(session));
147 assertEquals("12345", getReceived(session));
148 assertEquals("1234567", getSent(session));
149
150 session.resumeRead();
151
152 Thread.sleep(100);
153
154 assertEquals('6', read(session));
155 assertEquals('7', read(session));
156
157 assertEquals("1234567", getReceived(session));
158 assertEquals("1234567", getSent(session));
159
160 }
161
162 session.close().awaitUninterruptibly();
163 }
164
165 private void write(IoSession session, String s) throws Exception {
166 session.write(IoBuffer.wrap(s.getBytes("ASCII")));
167 }
168
169 private int read(IoSession session) throws Exception {
170 int pos = ((Integer) session.getAttribute("pos")).intValue();
171 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
172 Object lock = session.getAttribute("lock");
173 lock.wait(200);
174 }
175 session.setAttribute("pos", new Integer(pos + 1));
176 String received = getReceived(session);
177 assertTrue(received.length() > pos);
178 return getReceived(session).charAt(pos);
179 }
180
181 private boolean canRead(IoSession session) throws Exception {
182 int pos = ((Integer) session.getAttribute("pos")).intValue();
183 Object lock = session.getAttribute("lock");
184 lock.wait(250);
185 String received = getReceived(session);
186 return pos < received.length();
187 }
188
189 private String getReceived(IoSession session) throws Exception {
190 return session.getAttribute("received").toString();
191 }
192
193 private String getSent(IoSession session) throws Exception {
194 return session.getAttribute("sent").toString();
195 }
196
197 public static class ClientIoHandler extends IoHandlerAdapter {
198 @Override
199 public void sessionCreated(IoSession session) throws Exception {
200 super.sessionCreated(session);
201 session.setAttribute("pos", new Integer(0));
202 session.setAttribute("received", new StringBuffer());
203 session.setAttribute("sent", new StringBuffer());
204 session.setAttribute("lock", new Object());
205 }
206
207 @Override
208 public void messageReceived(IoSession session, Object message)
209 throws Exception {
210 IoBuffer buffer = (IoBuffer) message;
211 byte[] data = new byte[buffer.remaining()];
212 buffer.get(data);
213 Object lock = session.getAttribute("lock");
214 synchronized (lock) {
215 StringBuffer sb = (StringBuffer) session
216 .getAttribute("received");
217 sb.append(new String(data, "ASCII"));
218 lock.notifyAll();
219 }
220 }
221
222 @Override
223 public void messageSent(IoSession session, Object message)
224 throws Exception {
225 IoBuffer buffer = (IoBuffer) message;
226 buffer.rewind();
227 byte[] data = new byte[buffer.remaining()];
228 buffer.get(data);
229 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
230 sb.append(new String(data, "ASCII"));
231 }
232
233 }
234
235 private static class ServerIoHandler extends IoHandlerAdapter {
236 @Override
237 public void messageReceived(IoSession session, Object message)
238 throws Exception {
239
240 IoBuffer rb = (IoBuffer) message;
241 IoBuffer wb = IoBuffer.allocate(rb.remaining());
242 wb.put(rb);
243 wb.flip();
244 session.write(wb);
245 }
246 }
247 }