1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  
24  import org.apache.mina.core.buffer.IoBuffer;
25  import org.apache.mina.core.future.ConnectFuture;
26  import org.apache.mina.core.service.IoAcceptor;
27  import org.apache.mina.core.service.IoHandler;
28  import org.apache.mina.core.service.IoHandlerAdapter;
29  import org.apache.mina.core.service.TransportMetadata;
30  import org.apache.mina.core.session.IoSession;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Test;
34  
35  import static org.junit.Assert.assertEquals;
36  import static org.junit.Assert.assertFalse;
37  import static org.junit.Assert.assertTrue;
38  
39  /**
40   * Abstract base class for testing suspending and resuming reads and
41   * writes.
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev$, $Date$
45   */
46  public abstract class AbstractTrafficControlTest {
47  
48      protected int port;
49      protected IoAcceptor acceptor;
50      protected TransportMetadata transportType;
51  
52      public AbstractTrafficControlTest(IoAcceptor acceptor) {
53          this.acceptor = acceptor;
54      }
55  
56      @Before
57      public void setUp() throws Exception {
58          acceptor.setHandler(new ServerIoHandler());
59          acceptor.bind(createServerSocketAddress(0));
60          port = getPort(acceptor.getLocalAddress());
61      }
62  
63      @After
64      public void tearDown() throws Exception {
65          acceptor.unbind();
66          acceptor.dispose();
67      }
68  
69      protected abstract ConnectFuture connect(int port, IoHandler handler)
70              throws Exception;
71  
72      protected abstract SocketAddress createServerSocketAddress(int port);
73      protected abstract int getPort(SocketAddress address);
74  
75      @Test
76      public void testSuspendResumeReadWrite() throws Exception {
77          ConnectFuture future = connect(port, new ClientIoHandler());
78          future.awaitUninterruptibly();
79          IoSession session = future.getSession();
80  
81          // We wait for the sessionCreated() event is fired because we
82          // cannot guarantee that it is invoked already.
83          while (session.getAttribute("lock") == null) {
84              Thread.yield();
85          }
86          
87          Object lock = session.getAttribute("lock");
88          synchronized (lock) {
89  
90              write(session, "1");
91              assertEquals('1', read(session));
92              assertEquals("1", getReceived(session));
93              assertEquals("1", getSent(session));
94  
95              session.suspendRead();
96  
97              Thread.sleep(100);
98  
99              write(session, "2");
100             assertFalse(canRead(session));
101             assertEquals("1", getReceived(session));
102             assertEquals("12", getSent(session));
103 
104             session.suspendWrite();
105 
106             Thread.sleep(100);
107 
108             write(session, "3");
109             assertFalse(canRead(session));
110             assertEquals("1", getReceived(session));
111             assertEquals("12", getSent(session));
112 
113             session.resumeRead();
114 
115             Thread.sleep(100);
116 
117             write(session, "4");
118             assertEquals('2', read(session));
119             assertEquals("12", getReceived(session));
120             assertEquals("12", getSent(session));
121 
122             session.resumeWrite();
123 
124             Thread.sleep(100);
125 
126             assertEquals('3', read(session));
127             assertEquals('4', read(session));
128 
129             write(session, "5");
130             assertEquals('5', read(session));
131             assertEquals("12345", getReceived(session));
132             assertEquals("12345", getSent(session));
133 
134             session.suspendWrite();
135 
136             Thread.sleep(100);
137 
138             write(session, "6");
139             assertFalse(canRead(session));
140             assertEquals("12345", getReceived(session));
141             assertEquals("12345", getSent(session));
142 
143             session.suspendRead();
144             session.resumeWrite();
145 
146             Thread.sleep(100);
147 
148             write(session, "7");
149             assertFalse(canRead(session));
150             assertEquals("12345", getReceived(session));
151             assertEquals("1234567", getSent(session));
152 
153             session.resumeRead();
154 
155             Thread.sleep(100);
156 
157             assertEquals('6', read(session));
158             assertEquals('7', read(session));
159 
160             assertEquals("1234567", getReceived(session));
161             assertEquals("1234567", getSent(session));
162 
163         }
164 
165         session.close(true).awaitUninterruptibly();
166     }
167 
168     private void write(IoSession session, String s) throws Exception {
169         session.write(IoBuffer.wrap(s.getBytes("ASCII")));
170     }
171 
172     private int read(IoSession session) throws Exception {
173         int pos = ((Integer) session.getAttribute("pos")).intValue();
174         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
175             Object lock = session.getAttribute("lock");
176             lock.wait(200);
177         }
178         session.setAttribute("pos", new Integer(pos + 1));
179         String received = getReceived(session);
180         assertTrue(received.length() > pos);
181         return getReceived(session).charAt(pos);
182     }
183 
184     private boolean canRead(IoSession session) throws Exception {
185         int pos = ((Integer) session.getAttribute("pos")).intValue();
186         Object lock = session.getAttribute("lock");
187         lock.wait(250);
188         String received = getReceived(session);
189         return pos < received.length();
190     }
191 
192     private String getReceived(IoSession session) throws Exception {
193         return session.getAttribute("received").toString();
194     }
195 
196     private String getSent(IoSession session) throws Exception {
197         return session.getAttribute("sent").toString();
198     }
199 
200     public static class ClientIoHandler extends IoHandlerAdapter {
201         @Override
202         public void sessionCreated(IoSession session) throws Exception {
203             super.sessionCreated(session);
204             session.setAttribute("pos", new Integer(0));
205             session.setAttribute("received", new StringBuffer());
206             session.setAttribute("sent", new StringBuffer());
207             session.setAttribute("lock", new Object());
208         }
209 
210         @Override
211         public void messageReceived(IoSession session, Object message)
212                 throws Exception {
213             IoBuffer buffer = (IoBuffer) message;
214             byte[] data = new byte[buffer.remaining()];
215             buffer.get(data);
216             Object lock = session.getAttribute("lock");
217             synchronized (lock) {
218                 StringBuffer sb = (StringBuffer) session
219                         .getAttribute("received");
220                 sb.append(new String(data, "ASCII"));
221                 lock.notifyAll();
222             }
223         }
224 
225         @Override
226         public void messageSent(IoSession session, Object message)
227                 throws Exception {
228             IoBuffer buffer = (IoBuffer) message;
229             buffer.rewind();
230             byte[] data = new byte[buffer.remaining()];
231             buffer.get(data);
232             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
233             sb.append(new String(data, "ASCII"));
234         }
235 
236     }
237 
238     private static class ServerIoHandler extends IoHandlerAdapter {
239         @Override
240         public void messageReceived(IoSession session, Object message)
241                 throws Exception {
242             // Just echo the received bytes.
243             IoBuffer rb = (IoBuffer) message;
244             IoBuffer wb = IoBuffer.allocate(rb.remaining());
245             wb.put(rb);
246             wb.flip();
247             session.write(wb);
248         }
249     }
250 }