View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package testcase;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.future.ConnectFuture;
24  import org.apache.mina.core.future.IoFutureListener;
25  import org.apache.mina.core.service.IoHandlerAdapter;
26  import org.apache.mina.core.session.IoSession;
27  import org.apache.mina.filter.codec.ProtocolCodecFilter;
28  import org.apache.mina.filter.executor.ExecutorFilter;
29  import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
30  import org.apache.mina.transport.socket.SocketAcceptor;
31  import org.apache.mina.transport.socket.SocketConnector;
32  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33  import org.apache.mina.transport.socket.nio.NioSocketConnector;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  /**
44   * TODO : Add documentation
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   *
47   */
48  public class MinaRegressionTest extends IoHandlerAdapter {
49      private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class);
50  
51      public static final int MSG_SIZE = 5000;
52  
53      public static final int MSG_COUNT = 10;
54  
55      private static final int PORT = 23234;
56  
57      private static final int BUFFER_SIZE = 8192;
58  
59      private static final int TIMEOUT = 10000;
60  
61      public static final String OPEN = "open";
62  
63      public SocketAcceptor acceptor;
64  
65      public SocketConnector connector;
66  
67      private final Object LOCK = new Object();
68  
69      private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
70          public Thread newThread(final Runnable r) {
71              return new Thread(null, r, "MinaThread", 64 * 1024);
72          }
73      };
74  
75      private OrderedThreadPoolExecutor executor;
76  
77      public static AtomicInteger sent = new AtomicInteger(0);
78  
79      public MinaRegressionTest() throws IOException {
80          executor = new OrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, THREAD_FACTORY);
81  
82          acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
83          acceptor.setReuseAddress(true);
84          acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
85  
86          acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
87          acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));
88  
89          connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1);
90  
91          connector.setConnectTimeoutMillis(TIMEOUT);
92          connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE);
93          connector.getSessionConfig().setReuseAddress(true);
94      }
95  
96      public void connect() throws Exception {
97          final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);
98  
99          acceptor.setHandler(new MyIoHandler(LOCK));
100 
101         acceptor.bind(socketAddress);
102         connector.setHandler(this);
103 
104         final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() {
105             public void operationComplete(ConnectFuture future) {
106                 try {
107                     logger.info("Write message to session " + future.getSession().getId());
108                     final IoSession s = future.getSession();
109                     IoBuffer wb = IoBuffer.allocate(MSG_SIZE);
110                     wb.put(new byte[MSG_SIZE]);
111                     wb.flip();
112                     s.write(wb);
113                 } catch (Exception e) {
114                     logger.error("Can't send message: {}", e.getMessage());
115                 }
116             }
117         };
118 
119         for (int i = 0; i < MSG_COUNT; i++) {
120             ConnectFuture future = connector.connect(socketAddress);
121             future.addListener(listener);
122         }
123 
124         synchronized (LOCK) {
125             LOCK.wait(50000);
126         }
127 
128         connector.dispose();
129         acceptor.unbind();
130         acceptor.dispose();
131         executor.shutdownNow();
132 
133         logger.info("Received: " + MyIoHandler.received.intValue());
134         logger.info("Sent: " + sent.intValue());
135         logger.info("FINISH");
136     }
137 
138     @Override
139     public void exceptionCaught(IoSession session, Throwable cause) {
140         if (!(cause instanceof IOException)) {
141             logger.error("Exception: ", cause);
142         } else {
143             logger.info("I/O error: " + cause.getMessage());
144         }
145         session.closeNow();
146     }
147 
148     @Override
149     public void messageSent(IoSession session, Object message) throws Exception {
150         sent.incrementAndGet();
151     }
152 
153     public static void main(String[] args) throws Exception {
154         logger.info("START");
155         new MinaRegressionTest().connect();
156     }
157 }