1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  
24  import junit.framework.TestCase;
25  
26  import org.apache.mina.common.ConnectFuture;
27  import org.apache.mina.common.IoAcceptor;
28  import org.apache.mina.common.IoBuffer;
29  import org.apache.mina.common.IoHandler;
30  import org.apache.mina.common.IoHandlerAdapter;
31  import org.apache.mina.common.IoSession;
32  import org.apache.mina.common.TransportMetadata;
33  import org.apache.mina.util.AvailablePortFinder;
34  
35  /**
36   * Abstract base class for testing suspending and resuming reads and
37   * writes.
38   *
39   * @author The Apache MINA Project (dev@mina.apache.org)
40   * @version $Rev$, $Date$
41   */
42  public abstract class AbstractTrafficControlTest extends TestCase {
43      protected int port = 0;
44  
45      protected IoAcceptor acceptor;
46  
47      protected TransportMetadata transportType;
48  
49      public AbstractTrafficControlTest(IoAcceptor acceptor) {
50          this.acceptor = acceptor;
51      }
52  
53      @Override
54      protected void setUp() throws Exception {
55          super.setUp();
56  
57          port = AvailablePortFinder.getNextAvailable();
58          acceptor.setHandler(new ServerIoHandler());
59          acceptor.bind(createServerSocketAddress(port));
60      }
61  
62      @Override
63      protected void tearDown() throws Exception {
64          super.tearDown();
65  
66          acceptor.dispose();
67      }
68  
69      protected abstract ConnectFuture connect(int port, IoHandler handler)
70              throws Exception;
71  
72      protected abstract SocketAddress createServerSocketAddress(int port);
73  
74      public void testSuspendResumeReadWrite() throws Exception {
75          ConnectFuture future = connect(port, new ClientIoHandler());
76          future.awaitUninterruptibly();
77          IoSession session = future.getSession();
78  
79          // We wait for the sessionCreated() event is fired because we
80          // cannot guarantee that it is invoked already.
81          while (session.getAttribute("lock") == null) {
82              Thread.yield();
83          }
84  
85          Object lock = session.getAttribute("lock");
86          synchronized (lock) {
87  
88              write(session, "1");
89              assertEquals('1', read(session));
90              assertEquals("1", getReceived(session));
91              assertEquals("1", getSent(session));
92  
93              session.suspendRead();
94  
95              Thread.sleep(100);
96  
97              write(session, "2");
98              assertFalse(canRead(session));
99              assertEquals("1", getReceived(session));
100             assertEquals("12", getSent(session));
101 
102             session.suspendWrite();
103 
104             Thread.sleep(100);
105 
106             write(session, "3");
107             assertFalse(canRead(session));
108             assertEquals("1", getReceived(session));
109             assertEquals("12", getSent(session));
110 
111             session.resumeRead();
112 
113             Thread.sleep(100);
114 
115             write(session, "4");
116             assertEquals('2', read(session));
117             assertEquals("12", getReceived(session));
118             assertEquals("12", getSent(session));
119 
120             session.resumeWrite();
121 
122             Thread.sleep(100);
123 
124             assertEquals('3', read(session));
125             assertEquals('4', read(session));
126 
127             write(session, "5");
128             assertEquals('5', read(session));
129             assertEquals("12345", getReceived(session));
130             assertEquals("12345", getSent(session));
131 
132             session.suspendWrite();
133 
134             Thread.sleep(100);
135 
136             write(session, "6");
137             assertFalse(canRead(session));
138             assertEquals("12345", getReceived(session));
139             assertEquals("12345", getSent(session));
140 
141             session.suspendRead();
142             session.resumeWrite();
143 
144             Thread.sleep(100);
145 
146             write(session, "7");
147             assertFalse(canRead(session));
148             assertEquals("12345", getReceived(session));
149             assertEquals("1234567", getSent(session));
150 
151             session.resumeRead();
152 
153             Thread.sleep(100);
154 
155             assertEquals('6', read(session));
156             assertEquals('7', read(session));
157 
158             assertEquals("1234567", getReceived(session));
159             assertEquals("1234567", getSent(session));
160 
161         }
162 
163         session.close().awaitUninterruptibly();
164     }
165 
166     private void write(IoSession session, String s) throws Exception {
167         session.write(IoBuffer.wrap(s.getBytes("ASCII")));
168     }
169 
170     private int read(IoSession session) throws Exception {
171         int pos = ((Integer) session.getAttribute("pos")).intValue();
172         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
173             Object lock = session.getAttribute("lock");
174             lock.wait(200);
175         }
176         session.setAttribute("pos", new Integer(pos + 1));
177         String received = getReceived(session);
178         assertTrue(received.length() > pos);
179         return getReceived(session).charAt(pos);
180     }
181 
182     private boolean canRead(IoSession session) throws Exception {
183         int pos = ((Integer) session.getAttribute("pos")).intValue();
184         Object lock = session.getAttribute("lock");
185         lock.wait(250);
186         String received = getReceived(session);
187         return pos < received.length();
188     }
189 
190     private String getReceived(IoSession session) throws Exception {
191         return session.getAttribute("received").toString();
192     }
193 
194     private String getSent(IoSession session) throws Exception {
195         return session.getAttribute("sent").toString();
196     }
197 
198     public static class ClientIoHandler extends IoHandlerAdapter {
199         @Override
200         public void sessionCreated(IoSession session) throws Exception {
201             super.sessionCreated(session);
202             session.setAttribute("pos", new Integer(0));
203             session.setAttribute("received", new StringBuffer());
204             session.setAttribute("sent", new StringBuffer());
205             session.setAttribute("lock", new Object());
206         }
207 
208         @Override
209         public void messageReceived(IoSession session, Object message)
210                 throws Exception {
211             IoBuffer buffer = (IoBuffer) message;
212             byte[] data = new byte[buffer.remaining()];
213             buffer.get(data);
214             Object lock = session.getAttribute("lock");
215             synchronized (lock) {
216                 StringBuffer sb = (StringBuffer) session
217                         .getAttribute("received");
218                 sb.append(new String(data, "ASCII"));
219                 lock.notifyAll();
220             }
221         }
222 
223         @Override
224         public void messageSent(IoSession session, Object message)
225                 throws Exception {
226             IoBuffer buffer = (IoBuffer) message;
227             buffer.rewind();
228             byte[] data = new byte[buffer.remaining()];
229             buffer.get(data);
230             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
231             sb.append(new String(data, "ASCII"));
232         }
233 
234     }
235 
236     private static class ServerIoHandler extends IoHandlerAdapter {
237         @Override
238         public void messageReceived(IoSession session, Object message)
239                 throws Exception {
240             // Just echo the received bytes.
241             IoBuffer rb = (IoBuffer) message;
242             IoBuffer wb = IoBuffer.allocate(rb.remaining());
243             wb.put(rb);
244             wb.flip();
245             session.write(wb);
246         }
247     }
248 }