001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.service;
021
022import org.apache.mina.core.future.CloseFuture;
023import org.apache.mina.core.future.ConnectFuture;
024import org.apache.mina.core.future.IoFuture;
025import org.apache.mina.core.future.IoFutureListener;
026import org.apache.mina.core.session.IdleStatus;
027import org.apache.mina.core.session.IoSession;
028import org.apache.mina.filter.codec.ProtocolCodecFilter;
029import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
030import org.apache.mina.filter.logging.LoggingFilter;
031import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
032import org.apache.mina.transport.socket.nio.NioSocketConnector;
033import org.junit.Test;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import java.io.IOException;
038import java.net.InetSocketAddress;
039import java.nio.charset.Charset;
040import java.util.ArrayList;
041import java.util.List;
042import java.util.concurrent.CountDownLatch;
043
044/**
045 * test the AbstractIoService
046 *
047 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
048 */
049public class AbstractIoServiceTest {
050
051    private static final int PORT = 9123;
052
053    @Test
054    public void testDispose() throws IOException, InterruptedException {
055
056        List<String> threadsBefore = getThreadNames();
057
058        final IoAcceptor acceptor = new NioSocketAcceptor();
059
060        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
061        acceptor.getFilterChain().addLast("codec",
062                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
063
064        acceptor.setHandler(new ServerHandler());
065
066        acceptor.getSessionConfig().setReadBufferSize(2048);
067        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
068        acceptor.bind(new InetSocketAddress(PORT));
069        System.out.println("Server running ...");
070
071        final NioSocketConnector connector = new NioSocketConnector();
072
073        // Set connect timeout.
074        connector.setConnectTimeoutMillis(30 * 1000L);
075
076        connector.setHandler(new ClientHandler());
077        connector.getFilterChain().addLast("logger", new LoggingFilter());
078        connector.getFilterChain().addLast("codec",
079                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
080
081        // Start communication.
082        ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
083        cf.awaitUninterruptibly();
084
085        IoSession session = cf.getSession();
086
087        // send a message
088        session.write("Hello World!\r");
089
090        // wait until response is received
091        CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
092        latch.await();
093
094        // close the session
095        CloseFuture closeFuture = session.close(false);
096
097        System.out.println("session.close called");
098        //Thread.sleep(5);
099
100        // wait for session close and then dispose the connector
101        closeFuture.addListener(new IoFutureListener<IoFuture>() {
102
103            public void operationComplete(IoFuture future) {
104                System.out.println("managed session count=" + connector.getManagedSessionCount());
105                System.out.println("Disposing connector ...");
106                connector.dispose(true);
107                System.out.println("Disposing connector ... *finished*");
108
109            }
110        });
111
112        closeFuture.awaitUninterruptibly();
113        acceptor.dispose(true);
114
115        List<String> threadsAfter = getThreadNames();
116
117        System.out.println("threadsBefore = " + threadsBefore);
118        System.out.println("threadsAfter  = " + threadsAfter);
119
120        // Assert.assertEquals(threadsBefore, threadsAfter);
121
122    }
123
124    public static class ClientHandler extends IoHandlerAdapter {
125
126        private static final Logger LOGGER = LoggerFactory.getLogger("CLIENT");
127
128        @Override
129        public void sessionCreated(IoSession session) throws Exception {
130            session.setAttribute("latch", new CountDownLatch(1));
131        }
132
133        @Override
134        public void messageReceived(IoSession session, Object message) throws Exception {
135            LOGGER.info("client: messageReceived(" + session + ", " + message + ")");
136            CountDownLatch latch = (CountDownLatch) session.getAttribute("latch");
137            latch.countDown();
138        }
139
140        @Override
141        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
142            LOGGER.warn("exceptionCaught:", cause);
143        }
144    }
145
146    public static class ServerHandler extends IoHandlerAdapter {
147
148        private static final Logger LOGGER = LoggerFactory.getLogger("SERVER");
149
150        @Override
151        public void messageReceived(IoSession session, Object message) throws Exception {
152            LOGGER.info("server: messageReceived(" + session + ", " + message + ")");
153            session.write(message.toString());
154        }
155
156        @Override
157        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
158            LOGGER.warn("exceptionCaught:", cause);
159        }
160
161    }
162
163    public static void main(String[] args) throws IOException, InterruptedException {
164        new AbstractIoServiceTest().testDispose();
165    }
166
167    private List<String> getThreadNames() {
168        List<String> list = new ArrayList<String>();
169        int active = Thread.activeCount();
170        Thread[] threads = new Thread[active];
171        Thread.enumerate(threads);
172        for (Thread thread : threads) {
173            try {
174                String name = thread.getName();
175                list.add(name);
176            } catch (NullPointerException ignore) {
177            }
178        }
179        return list;
180    }
181
182}