1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.future.ConnectFuture;
29 import org.apache.mina.core.service.IoAcceptor;
30 import org.apache.mina.core.service.IoConnector;
31 import org.apache.mina.core.service.IoHandlerAdapter;
32 import org.apache.mina.core.session.IoSession;
33 import org.apache.mina.filter.executor.ExecutorFilter;
34 import org.junit.Test;
35
36
37
38
39
40
41 public class VmPipeEventOrderTest {
42 @Test
43 public void testServerToClient() throws Exception {
44 IoAcceptor acceptor = new VmPipeAcceptor();
45 IoConnector connector = new VmPipeConnector();
46
47 acceptor.setHandler(new IoHandlerAdapter() {
48 @Override
49 public void sessionOpened(IoSession session) throws Exception {
50 session.write("B");
51 }
52
53 @Override
54 public void messageSent(IoSession session, Object message)
55 throws Exception {
56 session.close(true);
57 }
58 });
59
60 acceptor.bind(new VmPipeAddress(1));
61
62 final StringBuffer actual = new StringBuffer();
63
64 connector.setHandler(new IoHandlerAdapter() {
65
66 @Override
67 public void messageReceived(IoSession session, Object message)
68 throws Exception {
69 actual.append(message);
70 }
71
72 @Override
73 public void sessionClosed(IoSession session) throws Exception {
74 actual.append("C");
75 }
76
77 @Override
78 public void sessionOpened(IoSession session) throws Exception {
79 actual.append("A");
80 }
81
82 });
83
84 ConnectFuture future = connector.connect(new VmPipeAddress(1));
85
86 future.awaitUninterruptibly();
87 future.getSession().getCloseFuture().awaitUninterruptibly();
88 acceptor.dispose();
89
90
91
92 while (actual.indexOf("C") < 0) {
93 Thread.yield();
94 }
95
96 assertEquals("ABC", actual.toString());
97 }
98
99 @Test
100 public void testClientToServer() throws Exception {
101 IoAcceptor acceptor = new VmPipeAcceptor();
102 IoConnector connector = new VmPipeConnector();
103
104 final StringBuffer actual = new StringBuffer();
105
106 acceptor.setHandler(new IoHandlerAdapter() {
107
108 @Override
109 public void messageReceived(IoSession session, Object message)
110 throws Exception {
111 actual.append(message);
112 }
113
114 @Override
115 public void sessionClosed(IoSession session) throws Exception {
116 actual.append("C");
117 }
118
119 @Override
120 public void sessionOpened(IoSession session) throws Exception {
121 actual.append("A");
122 }
123
124 });
125
126 acceptor.bind(new VmPipeAddress(1));
127
128 connector.setHandler(new IoHandlerAdapter() {
129 @Override
130 public void sessionOpened(IoSession session) throws Exception {
131 session.write("B");
132 }
133
134 @Override
135 public void messageSent(IoSession session, Object message)
136 throws Exception {
137 session.close(true);
138 }
139 });
140
141 ConnectFuture future = connector.connect(new VmPipeAddress(1));
142
143 future.awaitUninterruptibly();
144 future.getSession().getCloseFuture().awaitUninterruptibly();
145 acceptor.dispose();
146 connector.dispose();
147
148
149
150 while (actual.indexOf("C") < 0) {
151 Thread.yield();
152 }
153
154 assertEquals("ABC", actual.toString());
155 }
156
157 @Test
158 public void testSessionCreated() throws Exception {
159 final Semaphore semaphore = new Semaphore(0);
160 final StringBuffer stringBuffer = new StringBuffer();
161 VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
162 final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
163 vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
164 @Override
165 public void sessionCreated(IoSession session) throws Exception {
166
167
168
169
170 Thread.sleep(1000);
171 stringBuffer.append("A");
172 }
173
174 @Override
175 public void sessionOpened(IoSession session) throws Exception {
176 stringBuffer.append("B");
177 }
178
179 @Override
180 public void messageReceived(IoSession session, Object message)
181 throws Exception {
182 stringBuffer.append("C");
183 }
184
185 @Override
186 public void sessionClosed(IoSession session) throws Exception {
187 stringBuffer.append("D");
188 semaphore.release();
189 }
190 });
191 vmPipeAcceptor.bind(vmPipeAddress);
192
193 final VmPipeConnector vmPipeConnector = new VmPipeConnector();
194 vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter());
195 vmPipeConnector.setHandler(new IoHandlerAdapter());
196 ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
197 connectFuture.awaitUninterruptibly();
198 connectFuture.getSession().write(IoBuffer.wrap(new byte[1])).awaitUninterruptibly();
199 connectFuture.getSession().close(false).awaitUninterruptibly();
200
201 semaphore.tryAcquire(1, TimeUnit.SECONDS);
202 vmPipeAcceptor.unbind(vmPipeAddress);
203 assertEquals(1, connectFuture.getSession().getWrittenBytes());
204 assertEquals("ABCD", stringBuffer.toString());
205 }
206 }