1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.HttpRoute;
36 import org.apache.hc.client5.http.async.AsyncExecCallback;
37 import org.apache.hc.client5.http.async.AsyncExecChain;
38 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
39 import org.apache.hc.client5.http.async.AsyncExecRuntime;
40 import org.apache.hc.client5.http.protocol.HttpClientContext;
41 import org.apache.hc.core5.annotation.Contract;
42 import org.apache.hc.core5.annotation.Internal;
43 import org.apache.hc.core5.annotation.ThreadingBehavior;
44 import org.apache.hc.core5.concurrent.CancellableDependency;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpRequest;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.message.RequestLine;
51 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
52 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
53 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
54 import org.apache.hc.core5.http.nio.CapacityChannel;
55 import org.apache.hc.core5.http.nio.DataStreamChannel;
56 import org.apache.hc.core5.http.nio.RequestChannel;
57 import org.apache.hc.core5.http.protocol.HttpContext;
58 import org.apache.hc.core5.http.protocol.HttpProcessor;
59 import org.apache.hc.core5.util.Args;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63
64
65
66
67
68
69
70 @Contract(threading = ThreadingBehavior.STATELESS)
71 @Internal
72 public class H2AsyncMainClientExec implements AsyncExecChainHandler {
73
74 private static final Logger LOG = LoggerFactory.getLogger(H2AsyncMainClientExec.class);
75
76 private final HttpProcessor httpProcessor;
77
78 H2AsyncMainClientExec(final HttpProcessor httpProcessor) {
79 this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
80 }
81
82 @Override
83 public void execute(
84 final HttpRequest request,
85 final AsyncEntityProducer entityProducer,
86 final AsyncExecChain.Scope scope,
87 final AsyncExecChain chain,
88 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
89 final String exchangeId = scope.exchangeId;
90 final HttpRoute route = scope.route;
91 final CancellableDependency operation = scope.cancellableDependency;
92 final HttpClientContext clientContext = scope.clientContext;
93 final AsyncExecRuntime execRuntime = scope.execRuntime;
94
95 if (LOG.isDebugEnabled()) {
96 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
97 }
98
99 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
100
101 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
102
103 @Override
104 public void releaseResources() {
105 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
106 if (entityConsumer != null) {
107 entityConsumer.releaseResources();
108 }
109 }
110
111 @Override
112 public void failed(final Exception cause) {
113 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
114 if (entityConsumer != null) {
115 entityConsumer.releaseResources();
116 }
117 execRuntime.markConnectionNonReusable();
118 asyncExecCallback.failed(cause);
119 }
120
121 @Override
122 public void cancel() {
123 failed(new InterruptedIOException());
124 }
125
126 @Override
127 public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
128 clientContext.setRequest(request);
129 clientContext.setRoute(route);
130 httpProcessor.process(request, entityProducer, clientContext);
131
132 channel.sendRequest(request, entityProducer, context);
133 }
134
135 @Override
136 public int available() {
137 return entityProducer.available();
138 }
139
140 @Override
141 public void produce(final DataStreamChannel channel) throws IOException {
142 entityProducer.produce(channel);
143 }
144
145 @Override
146 public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
147 }
148
149 @Override
150 public void consumeResponse(
151 final HttpResponse response,
152 final EntityDetails entityDetails,
153 final HttpContext context) throws HttpException, IOException {
154
155 clientContext.setResponse(response);
156 httpProcessor.process(response, entityDetails, clientContext);
157
158 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
159 if (entityDetails == null) {
160 execRuntime.validateConnection();
161 asyncExecCallback.completed();
162 }
163 }
164
165 @Override
166 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
167 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
168 if (entityConsumer != null) {
169 entityConsumer.updateCapacity(capacityChannel);
170 } else {
171 capacityChannel.update(Integer.MAX_VALUE);
172 }
173 }
174
175 @Override
176 public void consume(final ByteBuffer src) throws IOException {
177 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
178 if (entityConsumer != null) {
179 entityConsumer.consume(src);
180 }
181 }
182
183 @Override
184 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
185 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
186 if (entityConsumer != null) {
187 entityConsumer.streamEnd(trailers);
188 } else {
189 execRuntime.validateConnection();
190 }
191 asyncExecCallback.completed();
192 }
193
194 };
195
196 if (LOG.isDebugEnabled()) {
197 operation.setDependency(execRuntime.execute(
198 exchangeId,
199 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
200 clientContext));
201 } else {
202 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
203 }
204 }
205
206 }