1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24
25 import junit.framework.Assert;
26 import junit.framework.TestCase;
27
28 import org.apache.mina.core.buffer.IoBuffer;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.service.IoAcceptor;
31 import org.apache.mina.core.service.IoConnector;
32 import org.apache.mina.core.service.IoHandlerAdapter;
33 import org.apache.mina.core.session.IoSession;
34 import org.apache.mina.filter.executor.ExecutorFilter;
35
36
37
38
39
40
41
42 public class VmPipeEventOrderTest extends TestCase {
43 public void testServerToClient() throws Exception {
44 IoAcceptor acceptor = new VmPipeAcceptor();
45 IoConnector connector = new VmPipeConnector();
46
47 acceptor.setHandler(new IoHandlerAdapter() {
48 @Override
49 public void sessionOpened(IoSession session) throws Exception {
50 session.write("B");
51 }
52
53 @Override
54 public void messageSent(IoSession session, Object message)
55 throws Exception {
56 session.close(true);
57 }
58 });
59
60 acceptor.bind(new VmPipeAddress(1));
61
62 final StringBuffer actual = new StringBuffer();
63
64 connector.setHandler(new IoHandlerAdapter() {
65
66 @Override
67 public void messageReceived(IoSession session, Object message)
68 throws Exception {
69 actual.append(message);
70 }
71
72 @Override
73 public void sessionClosed(IoSession session) throws Exception {
74 actual.append("C");
75 }
76
77 @Override
78 public void sessionOpened(IoSession session) throws Exception {
79 actual.append("A");
80 }
81
82 });
83
84 ConnectFuture future = connector.connect(new VmPipeAddress(1));
85
86 future.awaitUninterruptibly();
87 future.getSession().getCloseFuture().awaitUninterruptibly();
88 acceptor.dispose();
89
90
91
92 while (actual.indexOf("C") < 0) {
93 Thread.yield();
94 }
95
96 Assert.assertEquals("ABC", actual.toString());
97 }
98
99 public void testClientToServer() throws Exception {
100 IoAcceptor acceptor = new VmPipeAcceptor();
101 IoConnector connector = new VmPipeConnector();
102
103 final StringBuffer actual = new StringBuffer();
104
105 acceptor.setHandler(new IoHandlerAdapter() {
106
107 @Override
108 public void messageReceived(IoSession session, Object message)
109 throws Exception {
110 actual.append(message);
111 }
112
113 @Override
114 public void sessionClosed(IoSession session) throws Exception {
115 actual.append("C");
116 }
117
118 @Override
119 public void sessionOpened(IoSession session) throws Exception {
120 actual.append("A");
121 }
122
123 });
124
125 acceptor.bind(new VmPipeAddress(1));
126
127 connector.setHandler(new IoHandlerAdapter() {
128 @Override
129 public void sessionOpened(IoSession session) throws Exception {
130 session.write("B");
131 }
132
133 @Override
134 public void messageSent(IoSession session, Object message)
135 throws Exception {
136 session.close(true);
137 }
138 });
139
140 ConnectFuture future = connector.connect(new VmPipeAddress(1));
141
142 future.awaitUninterruptibly();
143 future.getSession().getCloseFuture().awaitUninterruptibly();
144 acceptor.dispose();
145 connector.dispose();
146
147
148
149 while (actual.indexOf("C") < 0) {
150 Thread.yield();
151 }
152
153 Assert.assertEquals("ABC", actual.toString());
154 }
155
156 public void testSessionCreated() throws Exception {
157 final Semaphore semaphore = new Semaphore(0);
158 final StringBuffer stringBuffer = new StringBuffer();
159 VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
160 final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
161 vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
162 @Override
163 public void sessionCreated(IoSession session) throws Exception {
164
165
166
167
168 Thread.sleep(1000);
169 stringBuffer.append("A");
170 }
171
172 @Override
173 public void sessionOpened(IoSession session) throws Exception {
174 stringBuffer.append("B");
175 }
176
177 @Override
178 public void messageReceived(IoSession session, Object message)
179 throws Exception {
180 stringBuffer.append("C");
181 }
182
183 @Override
184 public void sessionClosed(IoSession session) throws Exception {
185 stringBuffer.append("D");
186 semaphore.release();
187 }
188 });
189 vmPipeAcceptor.bind(vmPipeAddress);
190
191 final VmPipeConnector vmPipeConnector = new VmPipeConnector();
192 vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter());
193 vmPipeConnector.setHandler(new IoHandlerAdapter());
194 ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
195 connectFuture.awaitUninterruptibly();
196 connectFuture.getSession().write(IoBuffer.wrap(new byte[1]));
197 connectFuture.getSession().close(false).awaitUninterruptibly();
198
199 semaphore.tryAcquire(1, TimeUnit.SECONDS);
200 vmPipeAcceptor.unbind(vmPipeAddress);
201 Assert.assertEquals(1, connectFuture.getSession().getWrittenBytes());
202 Assert.assertEquals("ABCD", stringBuffer.toString());
203 }
204 }