View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.mina.transport.vmpipe;
20  
21  import static org.junit.Assert.fail;
22  
23  import java.lang.management.ManagementFactory;
24  import java.lang.management.ThreadInfo;
25  import java.lang.management.ThreadMXBean;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.apache.mina.core.future.ConnectFuture;
31  import org.apache.mina.core.service.IoAcceptor;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoHandlerAdapter;
34  import org.apache.mina.core.session.IoSession;
35  import org.junit.Test;
36  
37  /**
38   * TODO Add documentation
39   * 
40   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
41   */
42  public class VmPipeSessionCrossCommunicationTest {
43      @Test
44      public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
45          final VmPipeAddress address = new VmPipeAddress(1);
46          final IoConnector connector = new VmPipeConnector();
47          final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
48          final CountDownLatch latch = new CountDownLatch(1);
49          final CountDownLatch messageCount = new CountDownLatch(2);
50          IoAcceptor acceptor = new VmPipeAcceptor();
51  
52          acceptor.setHandler(new IoHandlerAdapter() {
53              @Override
54              public void messageReceived(IoSession session, Object message) throws Exception {
55                  //System.out.println(Thread.currentThread().getName() + ": " + message);
56  
57                  if ("start".equals(message)) {
58                      session.write("open new");
59                  } else if ("re-use c1".equals(message)) {
60                      session.write("tell me something on c1 now");
61                  } else if (((String) message).startsWith("please don't deadlock")) {
62                      messageCount.countDown();
63                  } else {
64                      fail("unexpected message received " + message);
65                  }
66              }
67          });
68          acceptor.bind(address);
69  
70          connector.setHandler(new IoHandlerAdapter() {
71              @Override
72              public void messageReceived(IoSession session, Object message) throws Exception {
73                  //System.out.println(Thread.currentThread().getName() + ": " + message);
74  
75                  if ("open new".equals(message)) {
76                      //System.out.println("opening c2 from " + Thread.currentThread().getName());
77  
78                      IoConnector c2 = new VmPipeConnector();
79                      c2.setHandler(new IoHandlerAdapter() {
80                          @Override
81                          public void sessionOpened(IoSession session) throws Exception {
82                              session.write("re-use c1");
83                          }
84  
85                          @Override
86                          public void messageReceived(IoSession session, Object message) throws Exception {
87                              //System.out.println(Thread.currentThread().getName() + ": " + message);
88  
89                              if ("tell me something on c1 now".equals(message)) {
90                                  latch.countDown();
91                                  c1.get().write("please don't deadlock via c1");
92                              } else {
93                                  fail("unexpected message received " + message);
94                              }
95                          }
96                      });
97  
98                      ConnectFuture c2Future = c2.connect(address);
99  
100                     c2Future.await();
101 
102                     latch.await();
103 
104                     c2Future.getSession().write("please don't deadlock via c2");
105                 } else {
106                     fail("unexpeced message received " + message);
107                 }
108             }
109         });
110 
111         ConnectFuture future = connector.connect(address);
112 
113         future.await();
114 
115         c1.set(future.getSession());
116         c1.get().write("start");
117 
118         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
119 
120         while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
121             long[] threads = threadMXBean.findMonitorDeadlockedThreads();
122 
123             if (null != threads) {
124                 StringBuffer sb = new StringBuffer(256);
125                 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
126 
127                 for (ThreadInfo info : infos) {
128                     sb.append(info.getThreadName())
129                             .append(" blocked on ")
130                             .append(info.getLockName())
131                             .append(" owned by ")
132                             .append(info.getLockOwnerName())
133                             .append("\n");
134                 }
135 
136                 for (ThreadInfo info : infos) {
137                     sb.append("\nStack for ").append(info.getThreadName()).append("\n");
138                     for (StackTraceElement element : info.getStackTrace()) {
139                         sb.append("\t").append(element).append("\n");
140                     }
141                 }
142 
143                 fail("deadlocked! \n" + sb);
144             }
145         }
146 
147         acceptor.setCloseOnDeactivation(false);
148         acceptor.dispose();
149     }
150 }