1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.transport.vmpipe;
20
21 import java.lang.management.ManagementFactory;
22 import java.lang.management.ThreadInfo;
23 import java.lang.management.ThreadMXBean;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import junit.framework.TestCase;
29
30 import org.apache.mina.core.future.ConnectFuture;
31 import org.apache.mina.core.service.IoAcceptor;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoHandlerAdapter;
34 import org.apache.mina.core.session.IoSession;
35
36
37
38
39
40
41
42 public class VmPipeSessionCrossCommunicationTest extends TestCase {
43 public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
44 final VmPipeAddress address = new VmPipeAddress(1);
45 final IoConnector connector = new VmPipeConnector();
46 final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
47 final CountDownLatch latch = new CountDownLatch(1);
48 final CountDownLatch messageCount = new CountDownLatch(2);
49 IoAcceptor acceptor = new VmPipeAcceptor();
50
51 acceptor.setHandler(new IoHandlerAdapter() {
52 @Override
53 public void messageReceived(IoSession session, Object message) throws Exception {
54
55
56 if ("start".equals(message)) {
57 session.write("open new");
58 } else if ("re-use c1".equals(message)) {
59 session.write("tell me something on c1 now");
60 } else if (((String) message).startsWith("please don't deadlock")) {
61 messageCount.countDown();
62 } else {
63 fail("unexpected message received " + message);
64 }
65 }
66 });
67 acceptor.bind(address);
68
69 connector.setHandler(new IoHandlerAdapter() {
70 @Override
71 public void messageReceived(IoSession session, Object message) throws Exception {
72
73
74 if ("open new".equals(message)) {
75
76
77 IoConnector c2 = new VmPipeConnector();
78 c2.setHandler(new IoHandlerAdapter() {
79 @Override
80 public void sessionOpened(IoSession session) throws Exception {
81 session.write("re-use c1");
82 }
83
84 @Override
85 public void messageReceived(IoSession session, Object message) throws Exception {
86
87
88 if ("tell me something on c1 now".equals(message)) {
89 latch.countDown();
90 c1.get().write("please don't deadlock via c1");
91 } else {
92 fail("unexpected message received " + message);
93 }
94 }
95 });
96
97 ConnectFuture c2Future = c2.connect(address);
98
99 c2Future.await();
100
101 latch.await();
102
103 c2Future.getSession().write("please don't deadlock via c2");
104 } else {
105 fail("unexpeced message received " + message);
106 }
107 }
108 });
109
110 ConnectFuture future = connector.connect(address);
111
112 future.await();
113
114 c1.set(future.getSession());
115 c1.get().write("start");
116
117 ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
118
119 while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
120 long[] threads = threadMXBean.findMonitorDeadlockedThreads();
121
122 if (null != threads) {
123 StringBuffer sb = new StringBuffer(256);
124 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
125
126 for (ThreadInfo info : infos) {
127 sb.append(info.getThreadName())
128 .append(" blocked on ")
129 .append(info.getLockName())
130 .append(" owned by ")
131 .append(info.getLockOwnerName())
132 .append("\n");
133 }
134
135 for (ThreadInfo info : infos) {
136 sb.append("\nStack for ").append(info.getThreadName()).append("\n");
137 for (StackTraceElement element : info.getStackTrace()) {
138 sb.append("\t").append(element).append("\n");
139 }
140 }
141
142 fail("deadlocked! \n" + sb);
143 }
144 }
145
146 acceptor.setCloseOnDeactivation(false);
147 acceptor.dispose();
148 }
149 }