001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport;
021
022import java.net.SocketAddress;
023
024import org.apache.mina.core.buffer.IoBuffer;
025import org.apache.mina.core.future.ConnectFuture;
026import org.apache.mina.core.service.IoAcceptor;
027import org.apache.mina.core.service.IoHandler;
028import org.apache.mina.core.service.IoHandlerAdapter;
029import org.apache.mina.core.service.TransportMetadata;
030import org.apache.mina.core.session.IoSession;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.Test;
034
035import static org.junit.Assert.assertEquals;
036import static org.junit.Assert.assertFalse;
037import static org.junit.Assert.assertTrue;
038
039/**
040 * Abstract base class for testing suspending and resuming reads and
041 * writes.
042 *
043 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
044 */
045public abstract class AbstractTrafficControlTest {
046
047    protected int port;
048
049    protected IoAcceptor acceptor;
050
051    protected TransportMetadata transportType;
052
053    public AbstractTrafficControlTest(IoAcceptor acceptor) {
054        this.acceptor = acceptor;
055    }
056
057    @Before
058    public void setUp() throws Exception {
059        acceptor.setHandler(new ServerIoHandler());
060        acceptor.bind(createServerSocketAddress(0));
061        port = getPort(acceptor.getLocalAddress());
062    }
063
064    @After
065    public void tearDown() throws Exception {
066        acceptor.unbind();
067        acceptor.dispose();
068    }
069
070    protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception;
071
072    protected abstract SocketAddress createServerSocketAddress(int port);
073
074    protected abstract int getPort(SocketAddress address);
075
076    @Test
077    public void testSuspendResumeReadWrite() throws Exception {
078        ConnectFuture future = connect(port, new ClientIoHandler());
079        future.awaitUninterruptibly();
080        IoSession session = future.getSession();
081
082        // We wait for the sessionCreated() event is fired because we
083        // cannot guarantee that it is invoked already.
084        while (session.getAttribute("lock") == null) {
085            Thread.yield();
086        }
087
088        Object lock = session.getAttribute("lock");
089        synchronized (lock) {
090
091            write(session, "1");
092            assertEquals('1', read(session));
093            assertEquals("1", getReceived(session));
094            assertEquals("1", getSent(session));
095
096            session.suspendRead();
097
098            Thread.sleep(100);
099
100            write(session, "2");
101            assertFalse(canRead(session));
102            assertEquals("1", getReceived(session));
103            assertEquals("12", getSent(session));
104
105            session.suspendWrite();
106
107            Thread.sleep(100);
108
109            write(session, "3");
110            assertFalse(canRead(session));
111            assertEquals("1", getReceived(session));
112            assertEquals("12", getSent(session));
113
114            session.resumeRead();
115
116            Thread.sleep(100);
117
118            write(session, "4");
119            assertEquals('2', read(session));
120            assertEquals("12", getReceived(session));
121            assertEquals("12", getSent(session));
122
123            session.resumeWrite();
124
125            Thread.sleep(100);
126
127            assertEquals('3', read(session));
128            assertEquals('4', read(session));
129
130            write(session, "5");
131            assertEquals('5', read(session));
132            assertEquals("12345", getReceived(session));
133            assertEquals("12345", getSent(session));
134
135            session.suspendWrite();
136
137            Thread.sleep(100);
138
139            write(session, "6");
140            assertFalse(canRead(session));
141            assertEquals("12345", getReceived(session));
142            assertEquals("12345", getSent(session));
143
144            session.suspendRead();
145            session.resumeWrite();
146
147            Thread.sleep(100);
148
149            write(session, "7");
150            assertFalse(canRead(session));
151            assertEquals("12345", getReceived(session));
152            assertEquals("1234567", getSent(session));
153
154            session.resumeRead();
155
156            Thread.sleep(100);
157
158            assertEquals('6', read(session));
159            assertEquals('7', read(session));
160
161            assertEquals("1234567", getReceived(session));
162            assertEquals("1234567", getSent(session));
163
164        }
165
166        session.close(true).awaitUninterruptibly();
167    }
168
169    private void write(IoSession session, String s) throws Exception {
170        session.write(IoBuffer.wrap(s.getBytes("ASCII")));
171    }
172
173    private int read(IoSession session) throws Exception {
174        int pos = ((Integer) session.getAttribute("pos")).intValue();
175        for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
176            Object lock = session.getAttribute("lock");
177            lock.wait(200);
178        }
179        session.setAttribute("pos", new Integer(pos + 1));
180        String received = getReceived(session);
181        assertTrue(received.length() > pos);
182        return getReceived(session).charAt(pos);
183    }
184
185    private boolean canRead(IoSession session) throws Exception {
186        int pos = ((Integer) session.getAttribute("pos")).intValue();
187        Object lock = session.getAttribute("lock");
188        lock.wait(250);
189        String received = getReceived(session);
190        return pos < received.length();
191    }
192
193    private String getReceived(IoSession session) throws Exception {
194        return session.getAttribute("received").toString();
195    }
196
197    private String getSent(IoSession session) throws Exception {
198        return session.getAttribute("sent").toString();
199    }
200
201    private static class ClientIoHandler extends IoHandlerAdapter {
202        /**
203         * Default constructor
204         */
205        public ClientIoHandler() {
206            super();
207        }
208
209        @Override
210        public void sessionCreated(IoSession session) throws Exception {
211            super.sessionCreated(session);
212            session.setAttribute("pos", new Integer(0));
213            session.setAttribute("received", new StringBuffer());
214            session.setAttribute("sent", new StringBuffer());
215            session.setAttribute("lock", new Object());
216        }
217
218        @Override
219        public void messageReceived(IoSession session, Object message) throws Exception {
220            IoBuffer buffer = (IoBuffer) message;
221            byte[] data = new byte[buffer.remaining()];
222            buffer.get(data);
223            Object lock = session.getAttribute("lock");
224            synchronized (lock) {
225                StringBuffer sb = (StringBuffer) session.getAttribute("received");
226                sb.append(new String(data, "ASCII"));
227                lock.notifyAll();
228            }
229        }
230
231        @Override
232        public void messageSent(IoSession session, Object message) throws Exception {
233            IoBuffer buffer = (IoBuffer) message;
234            buffer.rewind();
235            byte[] data = new byte[buffer.remaining()];
236            buffer.get(data);
237            StringBuffer sb = (StringBuffer) session.getAttribute("sent");
238            sb.append(new String(data, "ASCII"));
239        }
240
241    }
242
243    private static class ServerIoHandler extends IoHandlerAdapter {
244        /**
245         * Default constructor
246         */
247        public ServerIoHandler() {
248            super();
249        }
250
251        @Override
252        public void messageReceived(IoSession session, Object message) throws Exception {
253            // Just echo the received bytes.
254            IoBuffer rb = (IoBuffer) message;
255            IoBuffer wb = IoBuffer.allocate(rb.remaining());
256            wb.put(rb);
257            wb.flip();
258            session.write(wb);
259        }
260    }
261}