1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.HttpRoute;
34 import org.apache.hc.client5.http.async.AsyncExecRuntime;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.impl.ConnPoolSupport;
37 import org.apache.hc.client5.http.impl.Operations;
38 import org.apache.hc.client5.http.protocol.HttpClientContext;
39 import org.apache.hc.core5.concurrent.Cancellable;
40 import org.apache.hc.core5.concurrent.ComplexCancellable;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46 import org.apache.hc.core5.http.nio.HandlerFactory;
47 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.reactor.Command;
50 import org.apache.hc.core5.reactor.IOSession;
51 import org.apache.hc.core5.util.Identifiable;
52 import org.apache.hc.core5.util.TimeValue;
53 import org.apache.hc.core5.util.Timeout;
54 import org.slf4j.Logger;
55
56 class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57
58 private final Logger log;
59 private final InternalH2ConnPool connPool;
60 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61 private final AtomicReference<Endpoint> sessionRef;
62 private volatile boolean reusable;
63
64 InternalH2AsyncExecRuntime(
65 final Logger log,
66 final InternalH2ConnPool connPool,
67 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68 super();
69 this.log = log;
70 this.connPool = connPool;
71 this.pushHandlerFactory = pushHandlerFactory;
72 this.sessionRef = new AtomicReference<>();
73 }
74
75 @Override
76 public boolean isEndpointAcquired() {
77 return sessionRef.get() != null;
78 }
79
80 @Override
81 public Cancellable acquireEndpoint(
82 final String id,
83 final HttpRoute route,
84 final Object object,
85 final HttpClientContext context,
86 final FutureCallback<AsyncExecRuntime> callback) {
87 if (sessionRef.get() == null) {
88 final HttpHost target = route.getTargetHost();
89 final RequestConfig requestConfig = context.getRequestConfig();
90 @SuppressWarnings("deprecation")
91 final Timeout connectTimeout = requestConfig.getConnectTimeout();
92 if (log.isDebugEnabled()) {
93 log.debug("{} acquiring endpoint ({})", id, connectTimeout);
94 }
95 return Operations.cancellable(connPool.getSession(target, connectTimeout,
96 new FutureCallback<IOSession>() {
97
98 @Override
99 public void completed(final IOSession ioSession) {
100 sessionRef.set(new Endpoint(target, ioSession));
101 reusable = true;
102 if (log.isDebugEnabled()) {
103 log.debug("{} acquired endpoint", id);
104 }
105 callback.completed(InternalH2AsyncExecRuntime.this);
106 }
107
108 @Override
109 public void failed(final Exception ex) {
110 callback.failed(ex);
111 }
112
113 @Override
114 public void cancelled() {
115 callback.cancelled();
116 }
117
118 }));
119 }
120 callback.completed(this);
121 return Operations.nonCancellable();
122 }
123
124 private void closeEndpoint(final Endpoint endpoint) {
125 endpoint.session.close(CloseMode.GRACEFUL);
126 if (log.isDebugEnabled()) {
127 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
128 }
129 }
130
131 @Override
132 public void releaseEndpoint() {
133 final Endpoint endpoint = sessionRef.getAndSet(null);
134 if (endpoint != null && !reusable) {
135 closeEndpoint(endpoint);
136 }
137 }
138
139 @Override
140 public void discardEndpoint() {
141 final Endpoint endpoint = sessionRef.getAndSet(null);
142 if (endpoint != null) {
143 closeEndpoint(endpoint);
144 }
145 }
146
147 @Override
148 public boolean validateConnection() {
149 if (reusable) {
150 final Endpoint endpoint = sessionRef.get();
151 return endpoint != null && endpoint.session.isOpen();
152 }
153 final Endpoint endpoint = sessionRef.getAndSet(null);
154 if (endpoint != null) {
155 closeEndpoint(endpoint);
156 }
157 return false;
158 }
159
160 @Override
161 public boolean isEndpointConnected() {
162 final Endpoint endpoint = sessionRef.get();
163 return endpoint != null && endpoint.session.isOpen();
164 }
165
166
167 Endpoint ensureValid() {
168 final Endpoint endpoint = sessionRef.get();
169 if (endpoint == null) {
170 throw new IllegalStateException("I/O session not acquired / already released");
171 }
172 return endpoint;
173 }
174
175 @Override
176 public Cancellable connectEndpoint(
177 final HttpClientContext context,
178 final FutureCallback<AsyncExecRuntime> callback) {
179 final Endpoint endpoint = ensureValid();
180 if (endpoint.session.isOpen()) {
181 callback.completed(this);
182 return Operations.nonCancellable();
183 }
184 final HttpHost target = endpoint.target;
185 final RequestConfig requestConfig = context.getRequestConfig();
186 @SuppressWarnings("deprecation")
187 final Timeout connectTimeout = requestConfig.getConnectTimeout();
188 if (log.isDebugEnabled()) {
189 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
190 }
191 return Operations.cancellable(connPool.getSession(target, connectTimeout,
192 new FutureCallback<IOSession>() {
193
194 @Override
195 public void completed(final IOSession ioSession) {
196 sessionRef.set(new Endpoint(target, ioSession));
197 reusable = true;
198 if (log.isDebugEnabled()) {
199 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
200 }
201 callback.completed(InternalH2AsyncExecRuntime.this);
202 }
203
204 @Override
205 public void failed(final Exception ex) {
206 callback.failed(ex);
207 }
208
209 @Override
210 public void cancelled() {
211 callback.cancelled();
212 }
213
214 }));
215
216 }
217
218 @Override
219 public void disconnectEndpoint() {
220 final Endpoint endpoint = sessionRef.get();
221 if (endpoint != null) {
222 endpoint.session.close(CloseMode.GRACEFUL);
223 }
224 }
225
226 @Override
227 public void upgradeTls(final HttpClientContext context) {
228 throw new UnsupportedOperationException();
229 }
230
231 @Override
232 public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
233 throw new UnsupportedOperationException();
234 }
235
236 @Override
237 public Cancellable execute(
238 final String id,
239 final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
240 final ComplexCancellable complexCancellable = new ComplexCancellable();
241 final Endpoint endpoint = ensureValid();
242 final IOSession session = endpoint.session;
243 if (session.isOpen()) {
244 if (log.isDebugEnabled()) {
245 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
246 }
247 context.setProtocolVersion(HttpVersion.HTTP_2);
248 session.enqueue(
249 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
250 Command.Priority.NORMAL);
251 } else {
252 final HttpHost target = endpoint.target;
253 final RequestConfig requestConfig = context.getRequestConfig();
254 @SuppressWarnings("deprecation")
255 final Timeout connectTimeout = requestConfig.getConnectTimeout();
256 connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
257
258 @Override
259 public void completed(final IOSession ioSession) {
260 sessionRef.set(new Endpoint(target, ioSession));
261 reusable = true;
262 if (log.isDebugEnabled()) {
263 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
264 }
265 context.setProtocolVersion(HttpVersion.HTTP_2);
266 session.enqueue(
267 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
268 Command.Priority.NORMAL);
269 }
270
271 @Override
272 public void failed(final Exception ex) {
273 exchangeHandler.failed(ex);
274 }
275
276 @Override
277 public void cancelled() {
278 exchangeHandler.failed(new InterruptedIOException());
279 }
280
281 });
282 }
283 return complexCancellable;
284 }
285
286 @Override
287 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
288 throw new UnsupportedOperationException();
289 }
290
291 @Override
292 public void markConnectionNonReusable() {
293 reusable = false;
294 }
295
296 static class Endpoint implements Identifiable {
297
298 final HttpHost target;
299 final IOSession session;
300
301 Endpoint(final HttpHost target, final IOSession session) {
302 this.target = target;
303 this.session = session;
304 }
305
306 @Override
307 public String getId() {
308 return session.getId();
309 }
310
311 }
312
313 @Override
314 public AsyncExecRuntime fork() {
315 return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
316 }
317
318 }