1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.HttpRoute;
36 import org.apache.hc.client5.http.async.AsyncExecCallback;
37 import org.apache.hc.client5.http.async.AsyncExecChain;
38 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
39 import org.apache.hc.client5.http.async.AsyncExecRuntime;
40 import org.apache.hc.client5.http.protocol.HttpClientContext;
41 import org.apache.hc.core5.annotation.Contract;
42 import org.apache.hc.core5.annotation.Internal;
43 import org.apache.hc.core5.annotation.ThreadingBehavior;
44 import org.apache.hc.core5.concurrent.CancellableDependency;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpRequest;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.message.RequestLine;
51 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
52 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
53 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
54 import org.apache.hc.core5.http.nio.CapacityChannel;
55 import org.apache.hc.core5.http.nio.DataStreamChannel;
56 import org.apache.hc.core5.http.nio.RequestChannel;
57 import org.apache.hc.core5.http.protocol.HttpContext;
58 import org.apache.hc.core5.http.protocol.HttpCoreContext;
59 import org.apache.hc.core5.http.protocol.HttpProcessor;
60 import org.apache.hc.core5.util.Args;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64
65
66
67
68
69
70
71 @Contract(threading = ThreadingBehavior.STATELESS)
72 @Internal
73 public class H2AsyncMainClientExec implements AsyncExecChainHandler {
74
75 private static final Logger LOG = LoggerFactory.getLogger(H2AsyncMainClientExec.class);
76
77 private final HttpProcessor httpProcessor;
78
79 H2AsyncMainClientExec(final HttpProcessor httpProcessor) {
80 this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
81 }
82
83 @Override
84 public void execute(
85 final HttpRequest request,
86 final AsyncEntityProducer entityProducer,
87 final AsyncExecChain.Scope scope,
88 final AsyncExecChain chain,
89 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
90 final String exchangeId = scope.exchangeId;
91 final HttpRoute route = scope.route;
92 final CancellableDependency operation = scope.cancellableDependency;
93 final HttpClientContext clientContext = scope.clientContext;
94 final AsyncExecRuntime execRuntime = scope.execRuntime;
95
96 if (LOG.isDebugEnabled()) {
97 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
98 }
99
100 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
101
102 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
103
104 @Override
105 public void releaseResources() {
106 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
107 if (entityConsumer != null) {
108 entityConsumer.releaseResources();
109 }
110 }
111
112 @Override
113 public void failed(final Exception cause) {
114 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
115 if (entityConsumer != null) {
116 entityConsumer.releaseResources();
117 }
118 execRuntime.markConnectionNonReusable();
119 asyncExecCallback.failed(cause);
120 }
121
122 @Override
123 public void cancel() {
124 failed(new InterruptedIOException());
125 }
126
127 @Override
128 public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
129
130 clientContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
131 clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
132 httpProcessor.process(request, entityProducer, clientContext);
133
134 channel.sendRequest(request, entityProducer, context);
135 }
136
137 @Override
138 public int available() {
139 return entityProducer.available();
140 }
141
142 @Override
143 public void produce(final DataStreamChannel channel) throws IOException {
144 entityProducer.produce(channel);
145 }
146
147 @Override
148 public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
149 }
150
151 @Override
152 public void consumeResponse(
153 final HttpResponse response,
154 final EntityDetails entityDetails,
155 final HttpContext context) throws HttpException, IOException {
156
157 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
158 httpProcessor.process(response, entityDetails, clientContext);
159
160 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
161 if (entityDetails == null) {
162 execRuntime.validateConnection();
163 asyncExecCallback.completed();
164 }
165 }
166
167 @Override
168 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
169 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
170 if (entityConsumer != null) {
171 entityConsumer.updateCapacity(capacityChannel);
172 } else {
173 capacityChannel.update(Integer.MAX_VALUE);
174 }
175 }
176
177 @Override
178 public void consume(final ByteBuffer src) throws IOException {
179 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
180 if (entityConsumer != null) {
181 entityConsumer.consume(src);
182 }
183 }
184
185 @Override
186 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
187 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
188 if (entityConsumer != null) {
189 entityConsumer.streamEnd(trailers);
190 } else {
191 execRuntime.validateConnection();
192 }
193 asyncExecCallback.completed();
194 }
195
196 };
197
198 if (LOG.isDebugEnabled()) {
199 operation.setDependency(execRuntime.execute(
200 exchangeId,
201 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
202 clientContext));
203 } else {
204 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
205 }
206 }
207
208 }