1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.mina.transport.vmpipe;
20  
21  import java.lang.management.ManagementFactory;
22  import java.lang.management.ThreadInfo;
23  import java.lang.management.ThreadMXBean;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import junit.framework.TestCase;
29  
30  import org.apache.mina.core.future.ConnectFuture;
31  import org.apache.mina.core.service.IoAcceptor;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoHandlerAdapter;
34  import org.apache.mina.core.session.IoSession;
35  
36  /**
37   * TODO Add documentation
38   * 
39   * @author The Apache MINA Project (dev@mina.apache.org)
40   * @version $Rev: $, $Date:  $
41   */
42  public class VmPipeSessionCrossCommunicationTest extends TestCase {
43      public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
44          final VmPipeAddress address = new VmPipeAddress(1);
45          final IoConnector connector = new VmPipeConnector();
46          final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
47          final CountDownLatch latch = new CountDownLatch(1);
48          final CountDownLatch messageCount = new CountDownLatch(2);
49          IoAcceptor acceptor = new VmPipeAcceptor();
50  
51          acceptor.setHandler(new IoHandlerAdapter() {
52              @Override
53              public void messageReceived(IoSession session, Object message) throws Exception {
54                  //System.out.println(Thread.currentThread().getName() + ": " + message);
55  
56                  if ("start".equals(message)) {
57                      session.write("open new");
58                  } else if ("re-use c1".equals(message)) {
59                      session.write("tell me something on c1 now");
60                  } else if (((String) message).startsWith("please don't deadlock")) {
61                      messageCount.countDown();
62                  } else {
63                      fail("unexpected message received " + message);
64                  }
65              }
66          });
67          acceptor.bind(address);
68  
69          connector.setHandler(new IoHandlerAdapter() {
70              @Override
71              public void messageReceived(IoSession session, Object message) throws Exception {
72                  //System.out.println(Thread.currentThread().getName() + ": " + message);
73  
74                  if ("open new".equals(message)) {
75                      //System.out.println("opening c2 from " + Thread.currentThread().getName());
76  
77                      IoConnector c2 = new VmPipeConnector();
78                      c2.setHandler(new IoHandlerAdapter() {
79                          @Override
80                          public void sessionOpened(IoSession session) throws Exception {
81                              session.write("re-use c1");
82                          }
83  
84                          @Override
85                          public void messageReceived(IoSession session, Object message) throws Exception {
86                              //System.out.println(Thread.currentThread().getName() + ": " + message);
87  
88                              if ("tell me something on c1 now".equals(message)) {
89                                  latch.countDown();
90                                  c1.get().write("please don't deadlock via c1");
91                              } else {
92                                  fail("unexpected message received " + message);
93                              }
94                          }
95                      });
96  
97                      ConnectFuture c2Future = c2.connect(address);
98  
99                      c2Future.await();
100 
101                     latch.await();
102 
103                     c2Future.getSession().write("please don't deadlock via c2");
104                 } else {
105                     fail("unexpeced message received " + message);
106                 }
107             }
108         });
109 
110         ConnectFuture future = connector.connect(address);
111 
112         future.await();
113 
114         c1.set(future.getSession());
115         c1.get().write("start");
116 
117         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
118 
119         while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
120             long[] threads = threadMXBean.findMonitorDeadlockedThreads();
121 
122             if (null != threads) {
123                 StringBuffer sb = new StringBuffer(256);
124                 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
125 
126                 for (ThreadInfo info : infos) {
127                     sb.append(info.getThreadName())
128                             .append(" blocked on ")
129                             .append(info.getLockName())
130                             .append(" owned by ")
131                             .append(info.getLockOwnerName())
132                             .append("\n");
133                 }
134 
135                 for (ThreadInfo info : infos) {
136                     sb.append("\nStack for ").append(info.getThreadName()).append("\n");
137                     for (StackTraceElement element : info.getStackTrace()) {
138                         sb.append("\t").append(element).append("\n");
139                     }
140                 }
141 
142                 fail("deadlocked! \n" + sb);
143             }
144         }
145 
146         acceptor.setCloseOnDeactivation(false);
147         acceptor.dispose();
148     }
149 }