1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.util.concurrent.Semaphore;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.future.ConnectFuture;
29 import org.apache.mina.core.service.IoAcceptor;
30 import org.apache.mina.core.service.IoConnector;
31 import org.apache.mina.core.service.IoHandlerAdapter;
32 import org.apache.mina.core.session.IoSession;
33 import org.apache.mina.filter.executor.ExecutorFilter;
34 import org.junit.Test;
35
36
37
38
39
40
41 public class VmPipeEventOrderTest {
42 @Test
43 public void testServerToClient() throws Exception {
44 IoAcceptor acceptor = new VmPipeAcceptor();
45 IoConnector connector = new VmPipeConnector();
46
47 acceptor.setHandler(new IoHandlerAdapter() {
48 @Override
49 public void sessionOpened(IoSession session) throws Exception {
50 session.write("B");
51 }
52
53 @Override
54 public void messageSent(IoSession session, Object message) throws Exception {
55 session.closeNow();
56 }
57 });
58
59 acceptor.bind(new VmPipeAddress(1));
60
61 final StringBuffer actual = new StringBuffer();
62
63 connector.setHandler(new IoHandlerAdapter() {
64
65 @Override
66 public void messageReceived(IoSession session, Object message) throws Exception {
67 actual.append(message);
68 }
69
70 @Override
71 public void sessionClosed(IoSession session) throws Exception {
72 actual.append("C");
73 }
74
75 @Override
76 public void sessionOpened(IoSession session) throws Exception {
77 actual.append("A");
78 }
79
80 });
81
82 ConnectFuture future = connector.connect(new VmPipeAddress(1));
83
84 future.awaitUninterruptibly();
85 future.getSession().getCloseFuture().awaitUninterruptibly();
86 acceptor.dispose();
87
88
89
90 while (actual.indexOf("C") < 0) {
91 Thread.yield();
92 }
93
94 assertEquals("ABC", actual.toString());
95 }
96
97 @Test
98 public void testClientToServer() throws Exception {
99 IoAcceptor acceptor = new VmPipeAcceptor();
100 IoConnector connector = new VmPipeConnector();
101
102 final StringBuffer actual = new StringBuffer();
103
104 acceptor.setHandler(new IoHandlerAdapter() {
105
106 @Override
107 public void messageReceived(IoSession session, Object message) throws Exception {
108 actual.append(message);
109 }
110
111 @Override
112 public void sessionClosed(IoSession session) throws Exception {
113 actual.append("C");
114 }
115
116 @Override
117 public void sessionOpened(IoSession session) throws Exception {
118 actual.append("A");
119 }
120
121 });
122
123 acceptor.bind(new VmPipeAddress(1));
124
125 connector.setHandler(new IoHandlerAdapter() {
126 @Override
127 public void sessionOpened(IoSession session) throws Exception {
128 session.write("B");
129 }
130
131 @Override
132 public void messageSent(IoSession session, Object message) throws Exception {
133 session.closeNow();
134 }
135 });
136
137 ConnectFuture future = connector.connect(new VmPipeAddress(1));
138
139 future.awaitUninterruptibly();
140 future.getSession().getCloseFuture().awaitUninterruptibly();
141 acceptor.dispose();
142 connector.dispose();
143
144
145
146 while (actual.indexOf("C") < 0) {
147 Thread.yield();
148 }
149
150 assertEquals("ABC", actual.toString());
151 }
152
153 @Test
154 public void testSessionCreated() throws Exception {
155 final Semaphore semaphore = new Semaphore(0);
156 final StringBuffer stringBuffer = new StringBuffer();
157 VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
158 final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
159 vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
160 @Override
161 public void sessionCreated(IoSession session) throws Exception {
162
163
164
165
166 Thread.sleep(1000);
167 stringBuffer.append("A");
168 }
169
170 @Override
171 public void sessionOpened(IoSession session) throws Exception {
172 stringBuffer.append("B");
173 }
174
175 @Override
176 public void messageReceived(IoSession session, Object message) throws Exception {
177 stringBuffer.append("C");
178 }
179
180 @Override
181 public void sessionClosed(IoSession session) throws Exception {
182 stringBuffer.append("D");
183 semaphore.release();
184 }
185 });
186 vmPipeAcceptor.bind(vmPipeAddress);
187
188 final VmPipeConnector vmPipeConnector = new VmPipeConnector();
189 vmPipeConnector.getFilterChain().addLast("executor", new ExecutorFilter());
190 vmPipeConnector.setHandler(new IoHandlerAdapter());
191 ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
192 connectFuture.awaitUninterruptibly();
193 connectFuture.getSession().write(IoBuffer.wrap(new byte[1])).awaitUninterruptibly();
194 connectFuture.getSession().closeOnFlush().awaitUninterruptibly();
195
196 semaphore.tryAcquire(1, TimeUnit.SECONDS);
197 vmPipeAcceptor.unbind(vmPipeAddress);
198 assertEquals(1, connectFuture.getSession().getWrittenBytes());
199 assertEquals("ABCD", stringBuffer.toString());
200 }
201 }