1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24
25 import junit.framework.Assert;
26 import junit.framework.TestCase;
27
28 import org.apache.mina.core.buffer.IoBuffer;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.service.IoAcceptor;
31 import org.apache.mina.core.service.IoConnector;
32 import org.apache.mina.core.service.IoHandlerAdapter;
33 import org.apache.mina.core.session.IoSession;
34 import org.apache.mina.filter.executor.ExecutorFilter;
35
36
37
38
39
40
41
42 public class VmPipeEventOrderTest extends TestCase {
43 public void testServerToClient() throws Exception {
44 IoAcceptor acceptor = new VmPipeAcceptor();
45
46
47 IoConnector connector = new VmPipeConnector();
48
49
50 acceptor.setHandler(new IoHandlerAdapter() {
51 @Override
52 public void sessionOpened(IoSession session) throws Exception {
53 session.write("B");
54 }
55
56 @Override
57 public void messageSent(IoSession session, Object message)
58 throws Exception {
59 session.close();
60 }
61 });
62
63 acceptor.bind(new VmPipeAddress(1));
64
65 final StringBuffer actual = new StringBuffer();
66
67 connector.setHandler(new IoHandlerAdapter() {
68
69 @Override
70 public void messageReceived(IoSession session, Object message)
71 throws Exception {
72 actual.append(message);
73 }
74
75 @Override
76 public void sessionClosed(IoSession session) throws Exception {
77 actual.append("C");
78 }
79
80 @Override
81 public void sessionOpened(IoSession session) throws Exception {
82 actual.append("A");
83 }
84
85 });
86
87 ConnectFuture future = connector.connect(new VmPipeAddress(1));
88
89 future.awaitUninterruptibly();
90 future.getSession().getCloseFuture().awaitUninterruptibly();
91 acceptor.dispose();
92
93
94
95 while (actual.indexOf("C") < 0) {
96 Thread.yield();
97 }
98
99 Assert.assertEquals("ABC", actual.toString());
100 }
101
102 public void testClientToServer() throws Exception {
103 IoAcceptor acceptor = new VmPipeAcceptor();
104
105
106 IoConnector connector = new VmPipeConnector();
107
108
109 final StringBuffer actual = new StringBuffer();
110
111 acceptor.setHandler(new IoHandlerAdapter() {
112
113 @Override
114 public void messageReceived(IoSession session, Object message)
115 throws Exception {
116 actual.append(message);
117 }
118
119 @Override
120 public void sessionClosed(IoSession session) throws Exception {
121 actual.append("C");
122 }
123
124 @Override
125 public void sessionOpened(IoSession session) throws Exception {
126 actual.append("A");
127 }
128
129 });
130
131 acceptor.bind(new VmPipeAddress(1));
132
133 connector.setHandler(new IoHandlerAdapter() {
134 @Override
135 public void sessionOpened(IoSession session) throws Exception {
136 session.write("B");
137 }
138
139 @Override
140 public void messageSent(IoSession session, Object message)
141 throws Exception {
142 session.close();
143 }
144 });
145
146 ConnectFuture future = connector.connect(new VmPipeAddress(1));
147
148 future.awaitUninterruptibly();
149 future.getSession().getCloseFuture().awaitUninterruptibly();
150 acceptor.dispose();
151 connector.dispose();
152
153
154
155 while (actual.indexOf("C") < 0) {
156 Thread.yield();
157 }
158
159 Assert.assertEquals("ABC", actual.toString());
160 }
161
162 public void testSessionCreated() throws Exception {
163 final Semaphore semaphore = new Semaphore(0);
164 final StringBuffer stringBuffer = new StringBuffer();
165 VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
166 final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
167 vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
168 @Override
169 public void sessionCreated(IoSession session) throws Exception {
170
171
172
173
174 Thread.sleep(1000);
175 stringBuffer.append("A");
176 }
177
178 @Override
179 public void sessionOpened(IoSession session) throws Exception {
180 stringBuffer.append("B");
181 }
182
183 @Override
184 public void messageReceived(IoSession session, Object message)
185 throws Exception {
186 stringBuffer.append("C");
187 }
188
189 @Override
190 public void sessionClosed(IoSession session) throws Exception {
191 stringBuffer.append("D");
192 semaphore.release();
193 }
194 });
195 vmPipeAcceptor.bind(vmPipeAddress);
196
197 final VmPipeConnector vmPipeConnector = new VmPipeConnector();
198 vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter());
199 vmPipeConnector.setHandler(new IoHandlerAdapter());
200 ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
201 connectFuture.awaitUninterruptibly();
202 connectFuture.getSession().write(IoBuffer.wrap(new byte[1]));
203 connectFuture.getSession().closeOnFlush().awaitUninterruptibly();
204
205 semaphore.tryAcquire(1, TimeUnit.SECONDS);
206 vmPipeAcceptor.unbind(vmPipeAddress);
207 Assert.assertEquals(1, connectFuture.getSession().getWrittenBytes());
208 Assert.assertEquals("ABCD", stringBuffer.toString());
209 }
210 }