1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.util.internal;
20
21 import java.io.BufferedInputStream;
22 import java.io.BufferedOutputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import java.net.SocketOption;
29 import java.nio.channels.AsynchronousChannelGroup;
30 import java.nio.channels.AsynchronousServerSocketChannel;
31 import java.nio.channels.AsynchronousSocketChannel;
32 import java.nio.charset.StandardCharsets;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.junit.Test;
45
46 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
47 import static java.net.StandardSocketOptions.SO_REUSEADDR;
48 import static java.net.StandardSocketOptions.TCP_NODELAY;
49 import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
50 import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
51 import static org.assertj.core.api.Assertions.assertThat;
52
53
54
55
56 @SuppressWarnings("checkstyle:magicnumber")
57 public class AsyncSocketTest {
58 private static final String LONG_STRING =
59 "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
60
61 private final CountDownLatch barrier = new CountDownLatch(1);
62 private final AtomicLong writeTime = new AtomicLong();
63 private final AtomicLong readTime = new AtomicLong();
64
65 private volatile InetSocketAddress address;
66
67 @Test(timeout = 10_000L)
68 public void test() throws Exception {
69 int forks = 2;
70 ThreadFactory factory = DaemonThreadFactory.newDaemonThreadFactory();
71 ExecutorService executorService = Executors.newCachedThreadPool(factory);
72 if (executorService instanceof ThreadPoolExecutor) {
73 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
74 threadPoolExecutor.setCorePoolSize(
75 Math.min(forks, Runtime.getRuntime().availableProcessors()));
76 threadPoolExecutor.prestartCoreThread();
77 }
78 AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
79 AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
80 setTrueOptions(server, SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE);
81 InetAddress ip = InetAddress.getLoopbackAddress();
82 server.bind(new InetSocketAddress(ip, 0), 1);
83 address = (InetSocketAddress) server.getLocalAddress();
84
85 System.gc();
86 TimeUnit.SECONDS.sleep(3L);
87
88 Thread tc = new Thread() {
89 @Override
90 public void run() {
91 try {
92 client();
93 } catch (Exception e) {
94 e.printStackTrace();
95 }
96 }
97 };
98 tc.setDaemon(true);
99 tc.start();
100
101 Future<AsynchronousSocketChannel> acceptFuture = server.accept();
102 AsynchronousSocketChannel worker = acceptFuture.get();
103 if (!worker.isOpen()) {
104 throw new IOException("client socket closed");
105 }
106 final InputStream is = newInputStream(worker);
107 final OutputStream os = new BufferedOutputStream(newOutputStream(worker), 64 * 1024);
108
109 Thread tt = new Thread() {
110 public void run() {
111 try {
112 byte[] b = new byte[1024];
113 is.read(b);
114 } catch (Exception e) {
115
116 }
117 }
118 };
119 tt.setName("fork-1-event-thread-");
120 tt.setDaemon(true);
121 tt.start();
122
123 Thread t = new Thread() {
124 @SuppressWarnings("checkstyle:magicnumber")
125 public void run() {
126 try {
127 byte[] data = LONG_STRING.getBytes(StandardCharsets.US_ASCII);
128 long t1 = System.currentTimeMillis();
129 int i = 0;
130 for (; i < 320_000; i++) {
131 os.write(data);
132 long t2 = System.currentTimeMillis();
133 long spent = t2 - t1;
134
135 if (i % 100_000 == 0) {
136 System.out.println(spent + "ms: " + i);
137 }
138 }
139 os.flush();
140 long spent = System.currentTimeMillis() - t1;
141 writeTime.set(spent);
142 System.out.println(spent + "ms: " + i);
143 } catch (IOException e) {
144 e.printStackTrace();
145 }
146 }
147 };
148 t.setName("commands-fork-1");
149 t.setDaemon(true);
150 t.start();
151
152 barrier.await();
153 tt.join();
154 t.join();
155 tc.join();
156 worker.close();
157 server.close();
158
159
160
161 assertThat(writeTime.get()).isLessThan(1000L);
162
163
164
165 assertThat(readTime.get()).isLessThan(1000L);
166 }
167
168 @SuppressWarnings("checkstyle:magicnumber")
169 private void client() throws Exception {
170 InetSocketAddress hostAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), address.getPort());
171 AsynchronousSocketChannel clientSocketChannel = AsynchronousSocketChannel.open();
172 clientSocketChannel.connect(hostAddress).get();
173 InputStream is = new BufferedInputStream(newInputStream(clientSocketChannel), 64 * 1024);
174 List<byte[]> bytes = new ArrayList<>();
175 long t1 = System.currentTimeMillis();
176 for (int i = 0; i < 320_000; i++) {
177 byte[] b = new byte[100];
178 is.read(b);
179 bytes.add(b);
180 }
181 long t2 = System.currentTimeMillis();
182 long spent = t2 - t1;
183 readTime.set(spent);
184 System.out.println(new String(bytes.get(bytes.size() - 1)));
185 System.out.println("received within " + spent + "ms");
186 clientSocketChannel.close();
187 barrier.countDown();
188 }
189
190 @SafeVarargs
191 private static void setTrueOptions(AsynchronousServerSocketChannel server, SocketOption<Boolean>... options)
192 throws IOException {
193 for (SocketOption<Boolean> option : options) {
194 if (server.supportedOptions().contains(option)) {
195 server.setOption(option, true);
196 }
197 }
198 }
199 }