1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package testcase;
21
22 import org.apache.mina.core.buffer.IoBuffer;
23 import org.apache.mina.core.future.ConnectFuture;
24 import org.apache.mina.core.future.IoFutureListener;
25 import org.apache.mina.core.service.IoHandlerAdapter;
26 import org.apache.mina.core.session.IoSession;
27 import org.apache.mina.filter.codec.ProtocolCodecFilter;
28 import org.apache.mina.filter.executor.ExecutorFilter;
29 import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
30 import org.apache.mina.transport.socket.SocketAcceptor;
31 import org.apache.mina.transport.socket.SocketConnector;
32 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33 import org.apache.mina.transport.socket.nio.NioSocketConnector;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43
44
45
46
47
48 public class MinaRegressionTest extends IoHandlerAdapter {
49 private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class);
50
51 public static final int MSG_SIZE = 5000;
52
53 public static final int MSG_COUNT = 10;
54
55 private static final int PORT = 23234;
56
57 private static final int BUFFER_SIZE = 8192;
58
59 private static final int TIMEOUT = 10000;
60
61 public static final String OPEN = "open";
62
63 public SocketAcceptor acceptor;
64
65 public SocketConnector connector;
66
67 private final Object LOCK = new Object();
68
69 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
70 public Thread newThread(final Runnable r) {
71 return new Thread(null, r, "MinaThread", 64 * 1024);
72 }
73 };
74
75 private OrderedThreadPoolExecutor executor;
76
77 public static AtomicInteger sent = new AtomicInteger(0);
78
79 public MinaRegressionTest() throws IOException {
80 executor = new OrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, THREAD_FACTORY);
81
82 acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
83 acceptor.setReuseAddress(true);
84 acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
85
86 acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
87 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));
88
89 connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1);
90
91 connector.setConnectTimeoutMillis(TIMEOUT);
92 connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE);
93 connector.getSessionConfig().setReuseAddress(true);
94 }
95
96 public void connect() throws Exception {
97 final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);
98
99 acceptor.setHandler(new MyIoHandler(LOCK));
100
101 acceptor.bind(socketAddress);
102 connector.setHandler(this);
103
104 final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() {
105 public void operationComplete(ConnectFuture future) {
106 try {
107 logger.info("Write message to session " + future.getSession().getId());
108 final IoSession s = future.getSession();
109 IoBuffer wb = IoBuffer.allocate(MSG_SIZE);
110 wb.put(new byte[MSG_SIZE]);
111 wb.flip();
112 s.write(wb);
113 } catch (Exception e) {
114 logger.error("Can't send message: {}", e.getMessage());
115 }
116 }
117 };
118
119 for (int i = 0; i < MSG_COUNT; i++) {
120 ConnectFuture future = connector.connect(socketAddress);
121 future.addListener(listener);
122 }
123
124 synchronized (LOCK) {
125 LOCK.wait(50000);
126 }
127
128 connector.dispose();
129 acceptor.unbind();
130 acceptor.dispose();
131 executor.shutdownNow();
132
133 logger.info("Received: " + MyIoHandler.received.intValue());
134 logger.info("Sent: " + sent.intValue());
135 logger.info("FINISH");
136 }
137
138 @Override
139 public void exceptionCaught(IoSession session, Throwable cause) {
140 if (!(cause instanceof IOException)) {
141 logger.error("Exception: ", cause);
142 } else {
143 logger.info("I/O error: " + cause.getMessage());
144 }
145 session.close(true);
146 }
147
148 @Override
149 public void messageSent(IoSession session, Object message) throws Exception {
150 sent.incrementAndGet();
151 }
152
153 public static void main(String[] args) throws Exception {
154 logger.info("START");
155 new MinaRegressionTest().connect();
156 }
157 }