View Javadoc
1   package org.apache.mina.transport.socket.nio;
2   
3   import org.apache.mina.core.buffer.IoBuffer;
4   import org.apache.mina.core.future.CloseFuture;
5   import org.apache.mina.core.future.ConnectFuture;
6   import org.apache.mina.core.future.WriteFuture;
7   import org.apache.mina.core.service.IoHandlerAdapter;
8   import org.apache.mina.core.session.IoSession;
9   import org.apache.mina.filter.codec.ProtocolCodecFilter;
10  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
11  import org.apache.mina.transport.socket.SocketAcceptor;
12  import org.apache.mina.transport.socket.SocketConnector;
13  import org.apache.mina.util.AvailablePortFinder;
14  import org.junit.After;
15  import org.junit.Assert;
16  import org.junit.Before;
17  import org.junit.Test;
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import java.net.InetSocketAddress;
22  import java.net.SocketAddress;
23  
24  public class DIRMINA1041Test {
25  
26      private static final Logger LOG = LoggerFactory.getLogger(DIRMINA1041Test.class);
27      private static final String HOST = "localhost";
28      private static final int PORT = AvailablePortFinder.getNextAvailable(); 
29      private static final long TIMEOUT = 10000L;
30      private static int counter = 0;
31      private SocketAcceptor acceptor;
32      private SocketConnector connector;
33      
34      @Before
35      public void setUp() throws Exception {
36          acceptor = new NioSocketAcceptor();
37          acceptor.setReuseAddress(true);
38          acceptor.setHandler(new SomeAcceptHandler());
39          acceptor.bind(new InetSocketAddress(HOST, PORT));
40  
41          connector = new NioSocketConnector();
42          connector.getSessionConfig().setReuseAddress(true);
43          connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
44          connector.setHandler(new SomeConnectHandler());
45      }
46  
47      @Test
48      public void testWrite() throws InterruptedException {
49          SocketAddress address = new InetSocketAddress(HOST, PORT);
50          
51          try {
52              for (int i = 0; i < 10000; i++) {
53                  ConnectFuture future = connector.connect( address);
54      
55                  if (!future.awaitUninterruptibly(TIMEOUT)) {
56                      
57                      Assert.fail("ConnectFuture did not complete.");
58                  }
59                  
60                  IoSession session = future.getSession();
61                  
62                  if ( i % 1000 == 0 ) {
63                      System.out.println("Loop " + i +", counter = " + counter);
64                  }
65      
66                  WriteFuture writeFuture = session.write("Test" + i);
67                  
68                  //LOG.info("Waiting for WriteFuture to complete. Session: " + session);
69                  if (!writeFuture.await(TIMEOUT)) {
70                      LOG.info("WriteFuture did not complete. Session: " + session);
71                      Assert.fail("WriteFuture did not complete. Session: " + session);
72                  }
73                  
74                  CloseFuture closeFuture = session.closeOnFlush();
75                  
76                  if (!closeFuture.awaitUninterruptibly(TIMEOUT)) {
77                      Assert.fail("CloseFuture did not complete.");
78                  }
79                  
80                  //Thread.sleep( 2 );
81              }
82          } catch (Exception e) {
83              e.printStackTrace();
84          }
85          
86          System.out.println("Done " + 100000  + " loops, counter = " + counter);
87      }
88  
89      @After
90      public void tearDown() throws Exception {
91          try { connector.dispose(true); } catch (Throwable e) { e.printStackTrace(); }
92          try { acceptor.unbind(); acceptor.dispose(true); } catch (Throwable e) { e.printStackTrace(); }
93      }
94  
95      private IoSession getSession() {
96          ConnectFuture future = connector.connect(new InetSocketAddress(HOST, PORT));
97          if (!future.awaitUninterruptibly(TIMEOUT)) {
98              
99              Assert.fail("ConnectFuture did not complete.");
100         }
101         return future.getSession();
102     }
103 
104     private void closeSession(IoSession session) {
105         CloseFuture closeFuture = session.closeNow();
106         if (!closeFuture.awaitUninterruptibly(TIMEOUT)) {
107             Assert.fail("CloseFuture did not complete.");
108         }
109     }
110 
111     private class SomeConnectHandler extends IoHandlerAdapter {
112         @Override
113         public void sessionClosed(IoSession session) throws Exception {
114             //LOG.info("Connector - Session closed : " + session);
115         }
116         
117         public void messageSent(IoSession session, Object message) throws Exception {
118             //LOG.info("message sent : " + message);
119         }
120     }
121 
122     private class SomeAcceptHandler extends IoHandlerAdapter {
123         @Override
124         public void messageReceived(IoSession session, Object message) throws Exception {
125             //LOG.info("Message received : " + ((IoBuffer)message).toString() );
126             counter++;
127             //session.closeNow();
128         }
129     }
130 }