001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.vmpipe; 021 022import static org.junit.Assert.assertEquals; 023 024import java.util.concurrent.Semaphore; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.mina.core.buffer.IoBuffer; 028import org.apache.mina.core.future.ConnectFuture; 029import org.apache.mina.core.service.IoAcceptor; 030import org.apache.mina.core.service.IoConnector; 031import org.apache.mina.core.service.IoHandlerAdapter; 032import org.apache.mina.core.session.IoSession; 033import org.apache.mina.filter.executor.ExecutorFilter; 034import org.junit.Test; 035 036/** 037 * Makes sure the order of events are correct. 038 * 039 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 040 */ 041public class VmPipeEventOrderTest { 042 @Test 043 public void testServerToClient() throws Exception { 044 IoAcceptor acceptor = new VmPipeAcceptor(); 045 IoConnector connector = new VmPipeConnector(); 046 047 acceptor.setHandler(new IoHandlerAdapter() { 048 @Override 049 public void sessionOpened(IoSession session) throws Exception { 050 session.write("B"); 051 } 052 053 @Override 054 public void messageSent(IoSession session, Object message) throws Exception { 055 session.close(true); 056 } 057 }); 058 059 acceptor.bind(new VmPipeAddress(1)); 060 061 final StringBuffer actual = new StringBuffer(); 062 063 connector.setHandler(new IoHandlerAdapter() { 064 065 @Override 066 public void messageReceived(IoSession session, Object message) throws Exception { 067 actual.append(message); 068 } 069 070 @Override 071 public void sessionClosed(IoSession session) throws Exception { 072 actual.append("C"); 073 } 074 075 @Override 076 public void sessionOpened(IoSession session) throws Exception { 077 actual.append("A"); 078 } 079 080 }); 081 082 ConnectFuture future = connector.connect(new VmPipeAddress(1)); 083 084 future.awaitUninterruptibly(); 085 future.getSession().getCloseFuture().awaitUninterruptibly(); 086 acceptor.dispose(); 087 088 // sessionClosed() might not be invoked yet 089 // even if the connection is closed. 090 while (actual.indexOf("C") < 0) { 091 Thread.yield(); 092 } 093 094 assertEquals("ABC", actual.toString()); 095 } 096 097 @Test 098 public void testClientToServer() throws Exception { 099 IoAcceptor acceptor = new VmPipeAcceptor(); 100 IoConnector connector = new VmPipeConnector(); 101 102 final StringBuffer actual = new StringBuffer(); 103 104 acceptor.setHandler(new IoHandlerAdapter() { 105 106 @Override 107 public void messageReceived(IoSession session, Object message) throws Exception { 108 actual.append(message); 109 } 110 111 @Override 112 public void sessionClosed(IoSession session) throws Exception { 113 actual.append("C"); 114 } 115 116 @Override 117 public void sessionOpened(IoSession session) throws Exception { 118 actual.append("A"); 119 } 120 121 }); 122 123 acceptor.bind(new VmPipeAddress(1)); 124 125 connector.setHandler(new IoHandlerAdapter() { 126 @Override 127 public void sessionOpened(IoSession session) throws Exception { 128 session.write("B"); 129 } 130 131 @Override 132 public void messageSent(IoSession session, Object message) throws Exception { 133 session.close(true); 134 } 135 }); 136 137 ConnectFuture future = connector.connect(new VmPipeAddress(1)); 138 139 future.awaitUninterruptibly(); 140 future.getSession().getCloseFuture().awaitUninterruptibly(); 141 acceptor.dispose(); 142 connector.dispose(); 143 144 // sessionClosed() might not be invoked yet 145 // even if the connection is closed. 146 while (actual.indexOf("C") < 0) { 147 Thread.yield(); 148 } 149 150 assertEquals("ABC", actual.toString()); 151 } 152 153 @Test 154 public void testSessionCreated() throws Exception { 155 final Semaphore semaphore = new Semaphore(0); 156 final StringBuffer stringBuffer = new StringBuffer(); 157 VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor(); 158 final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345); 159 vmPipeAcceptor.setHandler(new IoHandlerAdapter() { 160 @Override 161 public void sessionCreated(IoSession session) throws Exception { 162 // pretend we are doing some time-consuming work. For 163 // performance reasons, you would never want to do time 164 // consuming work in sessionCreated. 165 // However, this increases the likelihood of the timing bug. 166 Thread.sleep(1000); 167 stringBuffer.append("A"); 168 } 169 170 @Override 171 public void sessionOpened(IoSession session) throws Exception { 172 stringBuffer.append("B"); 173 } 174 175 @Override 176 public void messageReceived(IoSession session, Object message) throws Exception { 177 stringBuffer.append("C"); 178 } 179 180 @Override 181 public void sessionClosed(IoSession session) throws Exception { 182 stringBuffer.append("D"); 183 semaphore.release(); 184 } 185 }); 186 vmPipeAcceptor.bind(vmPipeAddress); 187 188 final VmPipeConnector vmPipeConnector = new VmPipeConnector(); 189 vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter()); 190 vmPipeConnector.setHandler(new IoHandlerAdapter()); 191 ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress); 192 connectFuture.awaitUninterruptibly(); 193 connectFuture.getSession().write(IoBuffer.wrap(new byte[1])).awaitUninterruptibly(); 194 connectFuture.getSession().close(false).awaitUninterruptibly(); 195 196 semaphore.tryAcquire(1, TimeUnit.SECONDS); 197 vmPipeAcceptor.unbind(vmPipeAddress); 198 assertEquals(1, connectFuture.getSession().getWrittenBytes()); 199 assertEquals("ABCD", stringBuffer.toString()); 200 } 201}