1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.hc.core5.concurrent.FutureCallback;
37 import org.apache.hc.core5.http.HttpHost;
38 import org.apache.hc.core5.http.URIScheme;
39 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
40 import org.apache.hc.core5.http2.HttpVersionPolicy;
41 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
42 import org.apache.hc.core5.http2.nio.command.PingCommand;
43 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
44 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
45 import org.apache.hc.core5.reactor.Command;
46 import org.apache.hc.core5.reactor.IOReactorConfig;
47 import org.apache.hc.core5.reactor.IOSession;
48 import org.apache.hc.core5.reactor.ListenerEndpoint;
49 import org.apache.hc.core5.testing.nio.extension.H2AsyncServerResource;
50 import org.apache.hc.core5.testing.nio.extension.H2MultiplexingRequesterResource;
51 import org.apache.hc.core5.util.Timeout;
52 import org.junit.jupiter.api.Assertions;
53 import org.junit.jupiter.api.BeforeEach;
54 import org.junit.jupiter.api.Test;
55 import org.junit.jupiter.api.extension.RegisterExtension;
56
57 public class H2ConnPoolTest {
58
59 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
60
61 private final AtomicLong clientConnCount;
62 @RegisterExtension
63 private final H2AsyncServerResource serverResource;
64 @RegisterExtension
65 private final H2MultiplexingRequesterResource clientResource;
66
67 public H2ConnPoolTest() throws Exception {
68 this.serverResource = new H2AsyncServerResource(bootstrap -> bootstrap
69 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
70 .setIOReactorConfig(
71 IOReactorConfig.custom()
72 .setSoTimeout(TIMEOUT)
73 .build())
74 .register("*", () -> new EchoHandler(2048))
75 );
76
77 this.clientConnCount = new AtomicLong();
78 this.clientResource = new H2MultiplexingRequesterResource(bootstrap -> bootstrap
79 .setIOReactorConfig(IOReactorConfig.custom()
80 .setSoTimeout(TIMEOUT)
81 .build())
82 .setIOSessionListener(new LoggingIOSessionListener() {
83
84 @Override
85 public void connected(final IOSession session) {
86 clientConnCount.incrementAndGet();
87 super.connected(session);
88 }
89
90 })
91 );
92 }
93
94 @BeforeEach
95 public void resetCounts() {
96 clientConnCount.set(0);
97 }
98
99 @Test
100 public void testManyGetSession() throws Exception {
101 final int n = 200;
102
103 final HttpAsyncServer server = serverResource.start();
104 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), URIScheme.HTTP);
105 final ListenerEndpoint listener = future.get();
106 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
107 final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", address.getPort());
108
109 final H2MultiplexingRequester requester = clientResource.start();
110 final H2ConnPool connPool = requester.getConnPool();
111 final CountDownLatch latch = new CountDownLatch(n);
112 for (int i = 0; i < n; i++) {
113 connPool.getSession(target, TIMEOUT, new FutureCallback<IOSession>() {
114
115 @Override
116 public void completed(final IOSession session) {
117 session.enqueue(new PingCommand(new BasicPingHandler(
118 result -> {
119 latch.countDown();
120 })), Command.Priority.IMMEDIATE);
121 }
122
123 @Override
124 public void failed(final Exception ex) {
125 latch.countDown();
126 }
127
128 @Override
129 public void cancelled() {
130 latch.countDown();
131 }
132
133 });
134 }
135 Assertions.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
136
137 requester.initiateShutdown();
138 requester.awaitShutdown(TIMEOUT);
139
140 Assertions.assertEquals(1, clientConnCount.get());
141 }
142
143 @Test
144 public void testManyGetSessionFailures() throws Exception {
145 final int n = 200;
146
147 final HttpHost target = new HttpHost(URIScheme.HTTP.id, "pampa.invalid", 8888);
148
149 final H2MultiplexingRequester requester = clientResource.start();
150 final H2ConnPool connPool = requester.getConnPool();
151 final CountDownLatch latch = new CountDownLatch(n);
152 final ConcurrentLinkedQueue<Long> concurrentConnections = new ConcurrentLinkedQueue<>();
153 for (int i = 0; i < n; i++) {
154 connPool.getSession(target, TIMEOUT, new FutureCallback<IOSession>() {
155
156 @Override
157 public void completed(final IOSession session) {
158 latch.countDown();
159 }
160
161 @Override
162 public void failed(final Exception ex) {
163 latch.countDown();
164 }
165
166 @Override
167 public void cancelled() {
168 latch.countDown();
169 }
170
171 });
172 }
173
174 requester.initiateShutdown();
175 requester.awaitShutdown(TIMEOUT);
176
177 Assertions.assertEquals(0, clientConnCount.get());
178 }
179
180 }