001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.vmpipe;
021
022import static org.junit.Assert.assertEquals;
023
024import java.util.concurrent.Semaphore;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.mina.core.buffer.IoBuffer;
028import org.apache.mina.core.future.ConnectFuture;
029import org.apache.mina.core.service.IoAcceptor;
030import org.apache.mina.core.service.IoConnector;
031import org.apache.mina.core.service.IoHandlerAdapter;
032import org.apache.mina.core.session.IoSession;
033import org.apache.mina.filter.executor.ExecutorFilter;
034import org.junit.Test;
035
036/**
037 * Makes sure the order of events are correct.
038 *
039 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
040 */
041public class VmPipeEventOrderTest {
042    @Test
043    public void testServerToClient() throws Exception {
044        IoAcceptor acceptor = new VmPipeAcceptor();
045        IoConnector connector = new VmPipeConnector();
046
047        acceptor.setHandler(new IoHandlerAdapter() {
048            @Override
049            public void sessionOpened(IoSession session) throws Exception {
050                session.write("B");
051            }
052
053            @Override
054            public void messageSent(IoSession session, Object message) throws Exception {
055                session.close(true);
056            }
057        });
058
059        acceptor.bind(new VmPipeAddress(1));
060
061        final StringBuffer actual = new StringBuffer();
062
063        connector.setHandler(new IoHandlerAdapter() {
064
065            @Override
066            public void messageReceived(IoSession session, Object message) throws Exception {
067                actual.append(message);
068            }
069
070            @Override
071            public void sessionClosed(IoSession session) throws Exception {
072                actual.append("C");
073            }
074
075            @Override
076            public void sessionOpened(IoSession session) throws Exception {
077                actual.append("A");
078            }
079
080        });
081
082        ConnectFuture future = connector.connect(new VmPipeAddress(1));
083
084        future.awaitUninterruptibly();
085        future.getSession().getCloseFuture().awaitUninterruptibly();
086        acceptor.dispose();
087
088        // sessionClosed() might not be invoked yet
089        // even if the connection is closed.
090        while (actual.indexOf("C") < 0) {
091            Thread.yield();
092        }
093
094        assertEquals("ABC", actual.toString());
095    }
096
097    @Test
098    public void testClientToServer() throws Exception {
099        IoAcceptor acceptor = new VmPipeAcceptor();
100        IoConnector connector = new VmPipeConnector();
101
102        final StringBuffer actual = new StringBuffer();
103
104        acceptor.setHandler(new IoHandlerAdapter() {
105
106            @Override
107            public void messageReceived(IoSession session, Object message) throws Exception {
108                actual.append(message);
109            }
110
111            @Override
112            public void sessionClosed(IoSession session) throws Exception {
113                actual.append("C");
114            }
115
116            @Override
117            public void sessionOpened(IoSession session) throws Exception {
118                actual.append("A");
119            }
120
121        });
122
123        acceptor.bind(new VmPipeAddress(1));
124
125        connector.setHandler(new IoHandlerAdapter() {
126            @Override
127            public void sessionOpened(IoSession session) throws Exception {
128                session.write("B");
129            }
130
131            @Override
132            public void messageSent(IoSession session, Object message) throws Exception {
133                session.close(true);
134            }
135        });
136
137        ConnectFuture future = connector.connect(new VmPipeAddress(1));
138
139        future.awaitUninterruptibly();
140        future.getSession().getCloseFuture().awaitUninterruptibly();
141        acceptor.dispose();
142        connector.dispose();
143
144        // sessionClosed() might not be invoked yet
145        // even if the connection is closed.
146        while (actual.indexOf("C") < 0) {
147            Thread.yield();
148        }
149
150        assertEquals("ABC", actual.toString());
151    }
152
153    @Test
154    public void testSessionCreated() throws Exception {
155        final Semaphore semaphore = new Semaphore(0);
156        final StringBuffer stringBuffer = new StringBuffer();
157        VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
158        final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
159        vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
160            @Override
161            public void sessionCreated(IoSession session) throws Exception {
162                // pretend we are doing some time-consuming work. For
163                // performance reasons, you would never want to do time
164                // consuming work in sessionCreated.
165                // However, this increases the likelihood of the timing bug.
166                Thread.sleep(1000);
167                stringBuffer.append("A");
168            }
169
170            @Override
171            public void sessionOpened(IoSession session) throws Exception {
172                stringBuffer.append("B");
173            }
174
175            @Override
176            public void messageReceived(IoSession session, Object message) throws Exception {
177                stringBuffer.append("C");
178            }
179
180            @Override
181            public void sessionClosed(IoSession session) throws Exception {
182                stringBuffer.append("D");
183                semaphore.release();
184            }
185        });
186        vmPipeAcceptor.bind(vmPipeAddress);
187
188        final VmPipeConnector vmPipeConnector = new VmPipeConnector();
189        vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter());
190        vmPipeConnector.setHandler(new IoHandlerAdapter());
191        ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
192        connectFuture.awaitUninterruptibly();
193        connectFuture.getSession().write(IoBuffer.wrap(new byte[1])).awaitUninterruptibly();
194        connectFuture.getSession().close(false).awaitUninterruptibly();
195
196        semaphore.tryAcquire(1, TimeUnit.SECONDS);
197        vmPipeAcceptor.unbind(vmPipeAddress);
198        assertEquals(1, connectFuture.getSession().getWrittenBytes());
199        assertEquals("ABCD", stringBuffer.toString());
200    }
201}