1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  
24  import junit.framework.TestCase;
25  
26  import org.apache.mina.core.buffer.IoBuffer;
27  import org.apache.mina.core.future.ConnectFuture;
28  import org.apache.mina.core.service.IoAcceptor;
29  import org.apache.mina.core.service.IoHandler;
30  import org.apache.mina.core.service.IoHandlerAdapter;
31  import org.apache.mina.core.service.TransportMetadata;
32  import org.apache.mina.core.session.IoSession;
33  
34  /**
35   * Abstract base class for testing suspending and resuming reads and
36   * writes.
37   *
38   * @author The Apache MINA Project (dev@mina.apache.org)
39   * @version $Rev$, $Date$
40   */
41  public abstract class AbstractTrafficControlTest extends TestCase {
42  
43      protected int port;
44      protected IoAcceptor acceptor;
45      protected TransportMetadata transportType;
46  
47      public AbstractTrafficControlTest(IoAcceptor acceptor) {
48          this.acceptor = acceptor;
49      }
50  
51      @Override
52      protected void setUp() throws Exception {
53          super.setUp();
54  
55          acceptor.setHandler(new ServerIoHandler());
56          acceptor.bind(createServerSocketAddress(0));
57          port = getPort(acceptor.getLocalAddress());
58      }
59  
60      @Override
61      protected void tearDown() throws Exception {
62          super.tearDown();
63          acceptor.unbind();
64          acceptor.dispose();
65      }
66  
67      protected abstract ConnectFuture connect(int port, IoHandler handler)
68              throws Exception;
69  
70      protected abstract SocketAddress createServerSocketAddress(int port);
71      protected abstract int getPort(SocketAddress address);
72  
73      public void testSuspendResumeReadWrite() throws Exception {
74          ConnectFuture future = connect(port, new ClientIoHandler());
75          future.awaitUninterruptibly();
76          IoSession session = future.getSession();
77  
78          // We wait for the sessionCreated() event is fired because we
79          // cannot guarantee that it is invoked already.
80          while (session.getAttribute("lock") == null) {
81              Thread.yield();
82          }
83  
84          Object lock = session.getAttribute("lock");
85          synchronized (lock) {
86  
87              write(session, "1");
88              assertEquals('1', read(session));
89              assertEquals("1", getReceived(session));
90              assertEquals("1", getSent(session));
91  
92              session.suspendRead();
93  
94              Thread.sleep(100);
95  
96              write(session, "2");
97              assertFalse(canRead(session));
98              assertEquals("1", getReceived(session));
99              assertEquals("12", getSent(session));
100 
101             session.suspendWrite();
102 
103             Thread.sleep(100);
104 
105             write(session, "3");
106             assertFalse(canRead(session));
107             assertEquals("1", getReceived(session));
108             assertEquals("12", getSent(session));
109 
110             session.resumeRead();
111 
112             Thread.sleep(100);
113 
114             write(session, "4");
115             assertEquals('2', read(session));
116             assertEquals("12", getReceived(session));
117             assertEquals("12", getSent(session));
118 
119             session.resumeWrite();
120 
121             Thread.sleep(100);
122 
123             assertEquals('3', read(session));
124             assertEquals('4', read(session));
125 
126             write(session, "5");
127             assertEquals('5', read(session));
128             assertEquals("12345", getReceived(session));
129             assertEquals("12345", getSent(session));
130 
131             session.suspendWrite();
132 
133             Thread.sleep(100);
134 
135             write(session, "6");
136             assertFalse(canRead(session));
137             assertEquals("12345", getReceived(session));
138             assertEquals("12345", getSent(session));
139 
140             session.suspendRead();
141             session.resumeWrite();
142 
143             Thread.sleep(100);
144 
145             write(session, "7");
146             assertFalse(canRead(session));
147             assertEquals("12345", getReceived(session));
148             assertEquals("1234567", getSent(session));
149 
150             session.resumeRead();
151 
152             Thread.sleep(100);
153 
154             assertEquals('6', read(session));
155             assertEquals('7', read(session));
156 
157             assertEquals("1234567", getReceived(session));
158             assertEquals("1234567", getSent(session));
159 
160         }
161 
162         session.close().awaitUninterruptibly();
163     }
164 
165     private void write(IoSession session, String s) throws Exception {
166         session.write(IoBuffer.wrap(s.getBytes("ASCII")));
167     }
168 
169     private int read(IoSession session) throws Exception {
170         int pos = ((Integer) session.getAttribute("pos")).intValue();
171         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
172             Object lock = session.getAttribute("lock");
173             lock.wait(200);
174         }
175         session.setAttribute("pos", new Integer(pos + 1));
176         String received = getReceived(session);
177         assertTrue(received.length() > pos);
178         return getReceived(session).charAt(pos);
179     }
180 
181     private boolean canRead(IoSession session) throws Exception {
182         int pos = ((Integer) session.getAttribute("pos")).intValue();
183         Object lock = session.getAttribute("lock");
184         lock.wait(250);
185         String received = getReceived(session);
186         return pos < received.length();
187     }
188 
189     private String getReceived(IoSession session) throws Exception {
190         return session.getAttribute("received").toString();
191     }
192 
193     private String getSent(IoSession session) throws Exception {
194         return session.getAttribute("sent").toString();
195     }
196 
197     public static class ClientIoHandler extends IoHandlerAdapter {
198         @Override
199         public void sessionCreated(IoSession session) throws Exception {
200             super.sessionCreated(session);
201             session.setAttribute("pos", new Integer(0));
202             session.setAttribute("received", new StringBuffer());
203             session.setAttribute("sent", new StringBuffer());
204             session.setAttribute("lock", new Object());
205         }
206 
207         @Override
208         public void messageReceived(IoSession session, Object message)
209                 throws Exception {
210             IoBuffer buffer = (IoBuffer) message;
211             byte[] data = new byte[buffer.remaining()];
212             buffer.get(data);
213             Object lock = session.getAttribute("lock");
214             synchronized (lock) {
215                 StringBuffer sb = (StringBuffer) session
216                         .getAttribute("received");
217                 sb.append(new String(data, "ASCII"));
218                 lock.notifyAll();
219             }
220         }
221 
222         @Override
223         public void messageSent(IoSession session, Object message)
224                 throws Exception {
225             IoBuffer buffer = (IoBuffer) message;
226             buffer.rewind();
227             byte[] data = new byte[buffer.remaining()];
228             buffer.get(data);
229             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
230             sb.append(new String(data, "ASCII"));
231         }
232 
233     }
234 
235     private static class ServerIoHandler extends IoHandlerAdapter {
236         @Override
237         public void messageReceived(IoSession session, Object message)
238                 throws Exception {
239             // Just echo the received bytes.
240             IoBuffer rb = (IoBuffer) message;
241             IoBuffer wb = IoBuffer.allocate(rb.remaining());
242             wb.put(rb);
243             wb.flip();
244             session.write(wb);
245         }
246     }
247 }