View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  
24  import org.apache.mina.core.buffer.IoBuffer;
25  import org.apache.mina.core.future.ConnectFuture;
26  import org.apache.mina.core.service.IoAcceptor;
27  import org.apache.mina.core.service.IoHandler;
28  import org.apache.mina.core.service.IoHandlerAdapter;
29  import org.apache.mina.core.service.TransportMetadata;
30  import org.apache.mina.core.session.IoSession;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Test;
34  
35  import static org.junit.Assert.assertEquals;
36  import static org.junit.Assert.assertFalse;
37  import static org.junit.Assert.assertTrue;
38  
39  /**
40   * Abstract base class for testing suspending and resuming reads and
41   * writes.
42   *
43   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
44   */
45  public abstract class AbstractTrafficControlTest {
46  
47      protected int port;
48      protected IoAcceptor acceptor;
49      protected TransportMetadata transportType;
50  
51      public AbstractTrafficControlTest(IoAcceptor acceptor) {
52          this.acceptor = acceptor;
53      }
54  
55      @Before
56      public void setUp() throws Exception {
57          acceptor.setHandler(new ServerIoHandler());
58          acceptor.bind(createServerSocketAddress(0));
59          port = getPort(acceptor.getLocalAddress());
60      }
61  
62      @After
63      public void tearDown() throws Exception {
64          acceptor.unbind();
65          acceptor.dispose();
66      }
67  
68      protected abstract ConnectFuture connect(int port, IoHandler handler)
69              throws Exception;
70  
71      protected abstract SocketAddress createServerSocketAddress(int port);
72      protected abstract int getPort(SocketAddress address);
73  
74      @Test
75      public void testSuspendResumeReadWrite() throws Exception {
76          ConnectFuture future = connect(port, new ClientIoHandler());
77          future.awaitUninterruptibly();
78          IoSession session = future.getSession();
79  
80          // We wait for the sessionCreated() event is fired because we
81          // cannot guarantee that it is invoked already.
82          while (session.getAttribute("lock") == null) {
83              Thread.yield();
84          }
85          
86          Object lock = session.getAttribute("lock");
87          synchronized (lock) {
88  
89              write(session, "1");
90              assertEquals('1', read(session));
91              assertEquals("1", getReceived(session));
92              assertEquals("1", getSent(session));
93  
94              session.suspendRead();
95  
96              Thread.sleep(100);
97  
98              write(session, "2");
99              assertFalse(canRead(session));
100             assertEquals("1", getReceived(session));
101             assertEquals("12", getSent(session));
102 
103             session.suspendWrite();
104 
105             Thread.sleep(100);
106 
107             write(session, "3");
108             assertFalse(canRead(session));
109             assertEquals("1", getReceived(session));
110             assertEquals("12", getSent(session));
111 
112             session.resumeRead();
113 
114             Thread.sleep(100);
115 
116             write(session, "4");
117             assertEquals('2', read(session));
118             assertEquals("12", getReceived(session));
119             assertEquals("12", getSent(session));
120 
121             session.resumeWrite();
122 
123             Thread.sleep(100);
124 
125             assertEquals('3', read(session));
126             assertEquals('4', read(session));
127 
128             write(session, "5");
129             assertEquals('5', read(session));
130             assertEquals("12345", getReceived(session));
131             assertEquals("12345", getSent(session));
132 
133             session.suspendWrite();
134 
135             Thread.sleep(100);
136 
137             write(session, "6");
138             assertFalse(canRead(session));
139             assertEquals("12345", getReceived(session));
140             assertEquals("12345", getSent(session));
141 
142             session.suspendRead();
143             session.resumeWrite();
144 
145             Thread.sleep(100);
146 
147             write(session, "7");
148             assertFalse(canRead(session));
149             assertEquals("12345", getReceived(session));
150             assertEquals("1234567", getSent(session));
151 
152             session.resumeRead();
153 
154             Thread.sleep(100);
155 
156             assertEquals('6', read(session));
157             assertEquals('7', read(session));
158 
159             assertEquals("1234567", getReceived(session));
160             assertEquals("1234567", getSent(session));
161 
162         }
163 
164         session.close(true).awaitUninterruptibly();
165     }
166 
167     private void write(IoSession session, String s) throws Exception {
168         session.write(IoBuffer.wrap(s.getBytes("ASCII")));
169     }
170 
171     private int read(IoSession session) throws Exception {
172         int pos = ((Integer) session.getAttribute("pos")).intValue();
173         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
174             Object lock = session.getAttribute("lock");
175             lock.wait(200);
176         }
177         session.setAttribute("pos", new Integer(pos + 1));
178         String received = getReceived(session);
179         assertTrue(received.length() > pos);
180         return getReceived(session).charAt(pos);
181     }
182 
183     private boolean canRead(IoSession session) throws Exception {
184         int pos = ((Integer) session.getAttribute("pos")).intValue();
185         Object lock = session.getAttribute("lock");
186         lock.wait(250);
187         String received = getReceived(session);
188         return pos < received.length();
189     }
190 
191     private String getReceived(IoSession session) throws Exception {
192         return session.getAttribute("received").toString();
193     }
194 
195     private String getSent(IoSession session) throws Exception {
196         return session.getAttribute("sent").toString();
197     }
198 
199     private static class ClientIoHandler extends IoHandlerAdapter {
200         /**
201          * Default constructor
202          */
203         public ClientIoHandler() {
204             super();
205         }
206         
207         @Override
208         public void sessionCreated(IoSession session) throws Exception {
209             super.sessionCreated(session);
210             session.setAttribute("pos", new Integer(0));
211             session.setAttribute("received", new StringBuffer());
212             session.setAttribute("sent", new StringBuffer());
213             session.setAttribute("lock", new Object());
214         }
215 
216         @Override
217         public void messageReceived(IoSession session, Object message)
218                 throws Exception {
219             IoBuffer buffer = (IoBuffer) message;
220             byte[] data = new byte[buffer.remaining()];
221             buffer.get(data);
222             Object lock = session.getAttribute("lock");
223             synchronized (lock) {
224                 StringBuffer sb = (StringBuffer) session
225                         .getAttribute("received");
226                 sb.append(new String(data, "ASCII"));
227                 lock.notifyAll();
228             }
229         }
230 
231         @Override
232         public void messageSent(IoSession session, Object message)
233                 throws Exception {
234             IoBuffer buffer = (IoBuffer) message;
235             buffer.rewind();
236             byte[] data = new byte[buffer.remaining()];
237             buffer.get(data);
238             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
239             sb.append(new String(data, "ASCII"));
240         }
241 
242     }
243 
244     private static class ServerIoHandler extends IoHandlerAdapter {
245         /**
246          * Default constructor
247          */
248         public ServerIoHandler() {
249             super();
250         }
251         
252         @Override
253         public void messageReceived(IoSession session, Object message)
254                 throws Exception {
255             // Just echo the received bytes.
256             IoBuffer rb = (IoBuffer) message;
257             IoBuffer wb = IoBuffer.allocate(rb.remaining());
258             wb.put(rb);
259             wb.flip();
260             session.write(wb);
261         }
262     }
263 }