1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.async.AsyncExecCallback;
36 import org.apache.hc.client5.http.async.AsyncExecChain;
37 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
38 import org.apache.hc.client5.http.async.AsyncExecRuntime;
39 import org.apache.hc.client5.http.protocol.HttpClientContext;
40 import org.apache.hc.core5.annotation.Contract;
41 import org.apache.hc.core5.annotation.Internal;
42 import org.apache.hc.core5.annotation.ThreadingBehavior;
43 import org.apache.hc.core5.concurrent.CancellableDependency;
44 import org.apache.hc.core5.http.EntityDetails;
45 import org.apache.hc.core5.http.Header;
46 import org.apache.hc.core5.http.HttpException;
47 import org.apache.hc.core5.http.HttpRequest;
48 import org.apache.hc.core5.http.HttpResponse;
49 import org.apache.hc.core5.http.message.RequestLine;
50 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
51 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
52 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
53 import org.apache.hc.core5.http.nio.CapacityChannel;
54 import org.apache.hc.core5.http.nio.DataStreamChannel;
55 import org.apache.hc.core5.http.nio.RequestChannel;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60
61
62
63
64
65
66
67 @Contract(threading = ThreadingBehavior.STATELESS)
68 @Internal
69 public class H2AsyncMainClientExec implements AsyncExecChainHandler {
70
71 private static final Logger LOG = LoggerFactory.getLogger(H2AsyncMainClientExec.class);
72
73 @Override
74 public void execute(
75 final HttpRequest request,
76 final AsyncEntityProducer entityProducer,
77 final AsyncExecChain.Scope scope,
78 final AsyncExecChain chain,
79 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
80 final String exchangeId = scope.exchangeId;
81 final CancellableDependency operation = scope.cancellableDependency;
82 final HttpClientContext clientContext = scope.clientContext;
83 final AsyncExecRuntime execRuntime = scope.execRuntime;
84
85 if (LOG.isDebugEnabled()) {
86 LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
87 }
88
89 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
90
91 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
92
93 @Override
94 public void releaseResources() {
95 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
96 if (entityConsumer != null) {
97 entityConsumer.releaseResources();
98 }
99 }
100
101 @Override
102 public void failed(final Exception cause) {
103 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
104 if (entityConsumer != null) {
105 entityConsumer.releaseResources();
106 }
107 execRuntime.markConnectionNonReusable();
108 asyncExecCallback.failed(cause);
109 }
110
111 @Override
112 public void cancel() {
113 failed(new InterruptedIOException());
114 }
115
116 @Override
117 public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
118 channel.sendRequest(request, entityProducer, context);
119 }
120
121 @Override
122 public int available() {
123 return entityProducer.available();
124 }
125
126 @Override
127 public void produce(final DataStreamChannel channel) throws IOException {
128 entityProducer.produce(channel);
129 }
130
131 @Override
132 public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
133 }
134
135 @Override
136 public void consumeResponse(
137 final HttpResponse response,
138 final EntityDetails entityDetails,
139 final HttpContext context) throws HttpException, IOException {
140 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
141 if (entityDetails == null) {
142 execRuntime.validateConnection();
143 asyncExecCallback.completed();
144 }
145 }
146
147 @Override
148 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
149 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
150 if (entityConsumer != null) {
151 entityConsumer.updateCapacity(capacityChannel);
152 } else {
153 capacityChannel.update(Integer.MAX_VALUE);
154 }
155 }
156
157 @Override
158 public void consume(final ByteBuffer src) throws IOException {
159 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
160 if (entityConsumer != null) {
161 entityConsumer.consume(src);
162 }
163 }
164
165 @Override
166 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
167 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
168 if (entityConsumer != null) {
169 entityConsumer.streamEnd(trailers);
170 } else {
171 execRuntime.validateConnection();
172 }
173 asyncExecCallback.completed();
174 }
175
176 };
177
178 if (LOG.isDebugEnabled()) {
179 operation.setDependency(execRuntime.execute(
180 exchangeId,
181 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
182 clientContext));
183 } else {
184 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
185 }
186 }
187
188 }