001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package testcase;
021
022import org.apache.mina.core.buffer.IoBuffer;
023import org.apache.mina.core.future.ConnectFuture;
024import org.apache.mina.core.future.IoFutureListener;
025import org.apache.mina.core.service.IoHandlerAdapter;
026import org.apache.mina.core.session.IoSession;
027import org.apache.mina.filter.codec.ProtocolCodecFilter;
028import org.apache.mina.filter.executor.ExecutorFilter;
029import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
030import org.apache.mina.transport.socket.SocketAcceptor;
031import org.apache.mina.transport.socket.SocketConnector;
032import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
033import org.apache.mina.transport.socket.nio.NioSocketConnector;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import java.io.IOException;
038import java.net.InetSocketAddress;
039import java.util.concurrent.ThreadFactory;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043/**
044 * TODO : Add documentation
045 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
046 *
047 */
048public class MinaRegressionTest extends IoHandlerAdapter {
049    private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class);
050
051    public static final int MSG_SIZE = 5000;
052
053    public static final int MSG_COUNT = 10;
054
055    private static final int PORT = 23234;
056
057    private static final int BUFFER_SIZE = 8192;
058
059    private static final int TIMEOUT = 10000;
060
061    public static final String OPEN = "open";
062
063    public SocketAcceptor acceptor;
064
065    public SocketConnector connector;
066
067    private final Object LOCK = new Object();
068
069    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
070        public Thread newThread(final Runnable r) {
071            return new Thread(null, r, "MinaThread", 64 * 1024);
072        }
073    };
074
075    private OrderedThreadPoolExecutor executor;
076
077    public static AtomicInteger sent = new AtomicInteger(0);
078
079    public MinaRegressionTest() throws IOException {
080        executor = new OrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, THREAD_FACTORY);
081
082        acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
083        acceptor.setReuseAddress(true);
084        acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
085
086        acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
087        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));
088
089        connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1);
090
091        connector.setConnectTimeoutMillis(TIMEOUT);
092        connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE);
093        connector.getSessionConfig().setReuseAddress(true);
094    }
095
096    public void connect() throws Exception {
097        final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);
098
099        acceptor.setHandler(new MyIoHandler(LOCK));
100
101        acceptor.bind(socketAddress);
102        connector.setHandler(this);
103
104        final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() {
105            public void operationComplete(ConnectFuture future) {
106                try {
107                    logger.info("Write message to session " + future.getSession().getId());
108                    final IoSession s = future.getSession();
109                    IoBuffer wb = IoBuffer.allocate(MSG_SIZE);
110                    wb.put(new byte[MSG_SIZE]);
111                    wb.flip();
112                    s.write(wb);
113                } catch (Exception e) {
114                    logger.error("Can't send message: {}", e.getMessage());
115                }
116            }
117        };
118
119        for (int i = 0; i < MSG_COUNT; i++) {
120            ConnectFuture future = connector.connect(socketAddress);
121            future.addListener(listener);
122        }
123
124        synchronized (LOCK) {
125            LOCK.wait(50000);
126        }
127
128        connector.dispose();
129        acceptor.unbind();
130        acceptor.dispose();
131        executor.shutdownNow();
132
133        logger.info("Received: " + MyIoHandler.received.intValue());
134        logger.info("Sent: " + sent.intValue());
135        logger.info("FINISH");
136    }
137
138    @Override
139    public void exceptionCaught(IoSession session, Throwable cause) {
140        if (!(cause instanceof IOException)) {
141            logger.error("Exception: ", cause);
142        } else {
143            logger.info("I/O error: " + cause.getMessage());
144        }
145        session.close(true);
146    }
147
148    @Override
149    public void messageSent(IoSession session, Object message) throws Exception {
150        sent.incrementAndGet();
151    }
152
153    public static void main(String[] args) throws Exception {
154        logger.info("START");
155        new MinaRegressionTest().connect();
156    }
157}