001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package testcase; 021 022import org.apache.mina.core.buffer.IoBuffer; 023import org.apache.mina.core.future.ConnectFuture; 024import org.apache.mina.core.future.IoFutureListener; 025import org.apache.mina.core.service.IoHandlerAdapter; 026import org.apache.mina.core.session.IoSession; 027import org.apache.mina.filter.codec.ProtocolCodecFilter; 028import org.apache.mina.filter.executor.ExecutorFilter; 029import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; 030import org.apache.mina.transport.socket.SocketAcceptor; 031import org.apache.mina.transport.socket.SocketConnector; 032import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 033import org.apache.mina.transport.socket.nio.NioSocketConnector; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import java.io.IOException; 038import java.net.InetSocketAddress; 039import java.util.concurrent.ThreadFactory; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043/** 044 * TODO : Add documentation 045 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 046 * 047 */ 048public class MinaRegressionTest extends IoHandlerAdapter { 049 private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class); 050 051 public static final int MSG_SIZE = 5000; 052 053 public static final int MSG_COUNT = 10; 054 055 private static final int PORT = 23234; 056 057 private static final int BUFFER_SIZE = 8192; 058 059 private static final int TIMEOUT = 10000; 060 061 public static final String OPEN = "open"; 062 063 public SocketAcceptor acceptor; 064 065 public SocketConnector connector; 066 067 private final Object LOCK = new Object(); 068 069 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { 070 public Thread newThread(final Runnable r) { 071 return new Thread(null, r, "MinaThread", 64 * 1024); 072 } 073 }; 074 075 private OrderedThreadPoolExecutor executor; 076 077 public static AtomicInteger sent = new AtomicInteger(0); 078 079 public MinaRegressionTest() throws IOException { 080 executor = new OrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, THREAD_FACTORY); 081 082 acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1); 083 acceptor.setReuseAddress(true); 084 acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE); 085 086 acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor)); 087 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory())); 088 089 connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1); 090 091 connector.setConnectTimeoutMillis(TIMEOUT); 092 connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE); 093 connector.getSessionConfig().setReuseAddress(true); 094 } 095 096 public void connect() throws Exception { 097 final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT); 098 099 acceptor.setHandler(new MyIoHandler(LOCK)); 100 101 acceptor.bind(socketAddress); 102 connector.setHandler(this); 103 104 final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() { 105 public void operationComplete(ConnectFuture future) { 106 try { 107 logger.info("Write message to session " + future.getSession().getId()); 108 final IoSession s = future.getSession(); 109 IoBuffer wb = IoBuffer.allocate(MSG_SIZE); 110 wb.put(new byte[MSG_SIZE]); 111 wb.flip(); 112 s.write(wb); 113 } catch (Exception e) { 114 logger.error("Can't send message: {}", e.getMessage()); 115 } 116 } 117 }; 118 119 for (int i = 0; i < MSG_COUNT; i++) { 120 ConnectFuture future = connector.connect(socketAddress); 121 future.addListener(listener); 122 } 123 124 synchronized (LOCK) { 125 LOCK.wait(50000); 126 } 127 128 connector.dispose(); 129 acceptor.unbind(); 130 acceptor.dispose(); 131 executor.shutdownNow(); 132 133 logger.info("Received: " + MyIoHandler.received.intValue()); 134 logger.info("Sent: " + sent.intValue()); 135 logger.info("FINISH"); 136 } 137 138 @Override 139 public void exceptionCaught(IoSession session, Throwable cause) { 140 if (!(cause instanceof IOException)) { 141 logger.error("Exception: ", cause); 142 } else { 143 logger.info("I/O error: " + cause.getMessage()); 144 } 145 session.closeNow(); 146 } 147 148 @Override 149 public void messageSent(IoSession session, Object message) throws Exception { 150 sent.incrementAndGet(); 151 } 152 153 public static void main(String[] args) throws Exception { 154 logger.info("START"); 155 new MinaRegressionTest().connect(); 156 } 157}