001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.mina.transport.vmpipe;
020
021import static org.junit.Assert.fail;
022
023import java.lang.management.ManagementFactory;
024import java.lang.management.ThreadInfo;
025import java.lang.management.ThreadMXBean;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicReference;
029
030import org.apache.mina.core.future.ConnectFuture;
031import org.apache.mina.core.service.IoAcceptor;
032import org.apache.mina.core.service.IoConnector;
033import org.apache.mina.core.service.IoHandlerAdapter;
034import org.apache.mina.core.session.IoSession;
035import org.junit.Test;
036
037/**
038 * TODO Add documentation
039 * 
040 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
041 */
042public class VmPipeSessionCrossCommunicationTest {
043    @Test
044    public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
045        final VmPipeAddress address = new VmPipeAddress(1);
046        final IoConnector connector = new VmPipeConnector();
047        final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
048        final CountDownLatch latch = new CountDownLatch(1);
049        final CountDownLatch messageCount = new CountDownLatch(2);
050        IoAcceptor acceptor = new VmPipeAcceptor();
051
052        acceptor.setHandler(new IoHandlerAdapter() {
053            @Override
054            public void messageReceived(IoSession session, Object message) throws Exception {
055                //System.out.println(Thread.currentThread().getName() + ": " + message);
056
057                if ("start".equals(message)) {
058                    session.write("open new");
059                } else if ("re-use c1".equals(message)) {
060                    session.write("tell me something on c1 now");
061                } else if (((String) message).startsWith("please don't deadlock")) {
062                    messageCount.countDown();
063                } else {
064                    fail("unexpected message received " + message);
065                }
066            }
067        });
068        acceptor.bind(address);
069
070        connector.setHandler(new IoHandlerAdapter() {
071            @Override
072            public void messageReceived(IoSession session, Object message) throws Exception {
073                //System.out.println(Thread.currentThread().getName() + ": " + message);
074
075                if ("open new".equals(message)) {
076                    //System.out.println("opening c2 from " + Thread.currentThread().getName());
077
078                    IoConnector c2 = new VmPipeConnector();
079                    c2.setHandler(new IoHandlerAdapter() {
080                        @Override
081                        public void sessionOpened(IoSession session) throws Exception {
082                            session.write("re-use c1");
083                        }
084
085                        @Override
086                        public void messageReceived(IoSession session, Object message) throws Exception {
087                            //System.out.println(Thread.currentThread().getName() + ": " + message);
088
089                            if ("tell me something on c1 now".equals(message)) {
090                                latch.countDown();
091                                c1.get().write("please don't deadlock via c1");
092                            } else {
093                                fail("unexpected message received " + message);
094                            }
095                        }
096                    });
097
098                    ConnectFuture c2Future = c2.connect(address);
099
100                    c2Future.await();
101
102                    latch.await();
103
104                    c2Future.getSession().write("please don't deadlock via c2");
105                } else {
106                    fail("unexpeced message received " + message);
107                }
108            }
109        });
110
111        ConnectFuture future = connector.connect(address);
112
113        future.await();
114
115        c1.set(future.getSession());
116        c1.get().write("start");
117
118        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
119
120        while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
121            long[] threads = threadMXBean.findMonitorDeadlockedThreads();
122
123            if (null != threads) {
124                StringBuffer sb = new StringBuffer(256);
125                ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
126
127                for (ThreadInfo info : infos) {
128                    sb.append(info.getThreadName()).append(" blocked on ").append(info.getLockName())
129                            .append(" owned by ").append(info.getLockOwnerName()).append("\n");
130                }
131
132                for (ThreadInfo info : infos) {
133                    sb.append("\nStack for ").append(info.getThreadName()).append("\n");
134                    for (StackTraceElement element : info.getStackTrace()) {
135                        sb.append("\t").append(element).append("\n");
136                    }
137                }
138
139                fail("deadlocked! \n" + sb);
140            }
141        }
142
143        acceptor.setCloseOnDeactivation(false);
144        acceptor.dispose();
145    }
146}