1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package testcase;
21
22 import org.apache.mina.core.buffer.IoBuffer;
23 import org.apache.mina.core.future.ConnectFuture;
24 import org.apache.mina.core.future.IoFutureListener;
25 import org.apache.mina.core.service.IoHandlerAdapter;
26 import org.apache.mina.core.session.IoSession;
27 import org.apache.mina.filter.codec.ProtocolCodecFilter;
28 import org.apache.mina.filter.executor.ExecutorFilter;
29 import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
30 import org.apache.mina.transport.socket.SocketAcceptor;
31 import org.apache.mina.transport.socket.SocketConnector;
32 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33 import org.apache.mina.transport.socket.nio.NioSocketConnector;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43
44
45
46
47
48 public class MinaRegressionTest extends IoHandlerAdapter {
49 private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class);
50
51 public static final int MSG_SIZE = 5000;
52 public static final int MSG_COUNT = 10;
53 private static final int PORT = 23234;
54 private static final int BUFFER_SIZE = 8192;
55 private static final int TIMEOUT = 10000;
56
57 public static final String OPEN = "open";
58
59 public SocketAcceptor acceptor;
60 public SocketConnector connector;
61
62 private final Object LOCK = new Object();
63
64 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
65 public Thread newThread(final Runnable r) {
66 return new Thread(null, r, "MinaThread", 64 * 1024);
67 }
68 };
69
70 private OrderedThreadPoolExecutor executor;
71
72 public static AtomicInteger sent = new AtomicInteger(0);
73
74
75 public MinaRegressionTest() throws IOException {
76 executor = new OrderedThreadPoolExecutor(
77 0,
78 1000,
79 60,
80 TimeUnit.SECONDS,
81 THREAD_FACTORY);
82
83 acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
84 acceptor.setReuseAddress( true );
85 acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
86
87 acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
88 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));
89
90 connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1);
91
92 connector.setConnectTimeoutMillis(TIMEOUT);
93 connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE);
94 connector.getSessionConfig().setReuseAddress( true );
95 }
96
97 public void connect() throws Exception {
98 final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);
99
100 acceptor.setHandler(new MyIoHandler(LOCK));
101
102 acceptor.bind(socketAddress);
103 connector.setHandler(this);
104
105 final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() {
106 public void operationComplete(ConnectFuture future) {
107 try {logger.info( "Write message to session " + future.getSession().getId() );
108 final IoSession s = future.getSession();
109 IoBuffer wb = IoBuffer.allocate(MSG_SIZE);
110 wb.put(new byte[MSG_SIZE]);
111 wb.flip();
112 s.write(wb);
113 } catch (Exception e) {
114 logger.error("Can't send message: {}", e.getMessage());
115 }
116 }
117 };
118
119 for (int i = 0; i < MSG_COUNT; i++) {
120 ConnectFuture future = connector.connect(socketAddress);
121 future.addListener(listener);
122 }
123
124 synchronized (LOCK) {
125 LOCK.wait(50000);
126 }
127
128 connector.dispose();
129 acceptor.unbind();
130 acceptor.dispose();
131 executor.shutdownNow();
132
133 logger.info("Received: " + MyIoHandler.received.intValue());
134 logger.info("Sent: " + sent.intValue());
135 logger.info("FINISH");
136 }
137
138 @Override
139 public void exceptionCaught(IoSession session, Throwable cause) {
140 if (!(cause instanceof IOException)) {
141 logger.error("Exception: ", cause);
142 } else {
143 logger.info("I/O error: " + cause.getMessage());
144 }
145 session.close(true);
146 }
147
148 @Override
149 public void messageSent(IoSession session, Object message) throws Exception {
150 sent.incrementAndGet();
151 }
152
153 public static void main(String[] args) throws Exception {
154 logger.info("START");
155 new MinaRegressionTest().connect();
156 }
157 }