View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package testcase;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.future.ConnectFuture;
24  import org.apache.mina.core.future.IoFutureListener;
25  import org.apache.mina.core.service.IoHandlerAdapter;
26  import org.apache.mina.core.session.IoSession;
27  import org.apache.mina.filter.codec.ProtocolCodecFilter;
28  import org.apache.mina.filter.executor.ExecutorFilter;
29  import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
30  import org.apache.mina.transport.socket.SocketAcceptor;
31  import org.apache.mina.transport.socket.SocketConnector;
32  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
33  import org.apache.mina.transport.socket.nio.NioSocketConnector;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  /**
44   * TODO : Add documentation
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   *
47   */
48  public class MinaRegressionTest extends IoHandlerAdapter {
49    private static final Logger logger = LoggerFactory.getLogger(MinaRegressionTest.class);
50  
51    public static final int MSG_SIZE = 5000;
52    public static final int MSG_COUNT = 10;
53    private static final int PORT = 23234;
54    private static final int BUFFER_SIZE = 8192;
55    private static final int TIMEOUT = 10000;
56  
57    public static final String OPEN = "open";
58  
59    public SocketAcceptor acceptor;
60    public SocketConnector connector;
61  
62    private final Object LOCK = new Object();
63  
64    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
65      public Thread newThread(final Runnable r) {
66        return new Thread(null, r, "MinaThread", 64 * 1024);
67      }
68    };
69  
70    private OrderedThreadPoolExecutor executor;
71  
72    public static AtomicInteger sent = new AtomicInteger(0);
73  
74  
75    public MinaRegressionTest() throws IOException {
76      executor = new OrderedThreadPoolExecutor(
77        0,
78        1000,
79        60,
80        TimeUnit.SECONDS,
81        THREAD_FACTORY);
82  
83      acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1);
84      acceptor.setReuseAddress( true );
85      acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE);
86  
87      acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor));
88      acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));
89  
90      connector = new NioSocketConnector(Runtime.getRuntime().availableProcessors() + 1);
91  
92      connector.setConnectTimeoutMillis(TIMEOUT);
93      connector.getSessionConfig().setSendBufferSize(BUFFER_SIZE);
94      connector.getSessionConfig().setReuseAddress( true );
95    }
96  
97    public void connect() throws Exception {
98      final InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", PORT);
99  
100     acceptor.setHandler(new MyIoHandler(LOCK));
101 
102     acceptor.bind(socketAddress);
103     connector.setHandler(this);
104 
105     final IoFutureListener<ConnectFuture> listener = new IoFutureListener<ConnectFuture>() {
106       public void operationComplete(ConnectFuture future) {
107         try {logger.info( "Write message to session " + future.getSession().getId() );
108           final IoSession s = future.getSession();
109           IoBuffer wb = IoBuffer.allocate(MSG_SIZE);
110           wb.put(new byte[MSG_SIZE]);
111           wb.flip();
112           s.write(wb);
113         } catch (Exception e) {
114           logger.error("Can't send message: {}", e.getMessage());
115         }
116       }
117     };
118 
119     for (int i = 0; i < MSG_COUNT; i++) {
120       ConnectFuture future = connector.connect(socketAddress);
121       future.addListener(listener);
122     }
123 
124     synchronized (LOCK) {
125       LOCK.wait(50000);
126     }
127 
128     connector.dispose();
129     acceptor.unbind();
130     acceptor.dispose();
131     executor.shutdownNow();
132 
133     logger.info("Received: " + MyIoHandler.received.intValue());
134     logger.info("Sent: " + sent.intValue());
135     logger.info("FINISH");
136   }
137 
138   @Override
139   public void exceptionCaught(IoSession session, Throwable cause) {
140     if (!(cause instanceof IOException)) {
141       logger.error("Exception: ", cause);
142     } else {
143       logger.info("I/O error: " + cause.getMessage());
144     }
145     session.close(true);
146   }
147 
148   @Override
149   public void messageSent(IoSession session, Object message) throws Exception {
150     sent.incrementAndGet();
151   }
152 
153   public static void main(String[] args) throws Exception {
154     logger.info("START");
155     new MinaRegressionTest().connect();
156   }
157 }