1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.ConnectionReuseStrategy;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HeaderElements;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpHeaders;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.ProtocolException;
45 import org.apache.hc.core5.http.ProtocolVersion;
46 import org.apache.hc.core5.http.UnsupportedHttpVersionException;
47 import org.apache.hc.core5.http.config.Http1Config;
48 import org.apache.hc.core5.http.message.StatusLine;
49 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50 import org.apache.hc.core5.http.nio.CapacityChannel;
51 import org.apache.hc.core5.http.nio.DataStreamChannel;
52 import org.apache.hc.core5.http.nio.ResourceHolder;
53 import org.apache.hc.core5.http.protocol.HttpCoreContext;
54 import org.apache.hc.core5.http.protocol.HttpProcessor;
55 import org.apache.hc.core5.util.Timeout;
56
57 class ClientHttp1StreamHandler implements ResourceHolder {
58
59 private final Http1StreamChannel<HttpRequest> outputChannel;
60 private final DataStreamChannel internalDataChannel;
61 private final HttpProcessor httpProcessor;
62 private final Http1Config http1Config;
63 private final ConnectionReuseStrategy connectionReuseStrategy;
64 private final AsyncClientExchangeHandler exchangeHandler;
65 private final HttpCoreContext context;
66 private final AtomicBoolean requestCommitted;
67 private final AtomicBoolean done;
68
69 private volatile boolean keepAlive;
70 private volatile Timeout timeout;
71 private volatile HttpRequest committedRequest;
72 private volatile MessageState requestState;
73 private volatile MessageState responseState;
74
75 ClientHttp1StreamHandler(
76 final Http1StreamChannel<HttpRequest> outputChannel,
77 final HttpProcessor httpProcessor,
78 final Http1Config http1Config,
79 final ConnectionReuseStrategy connectionReuseStrategy,
80 final AsyncClientExchangeHandler exchangeHandler,
81 final HttpCoreContext context) {
82 this.outputChannel = outputChannel;
83 this.internalDataChannel = new DataStreamChannel() {
84
85 @Override
86 public void requestOutput() {
87 outputChannel.requestOutput();
88 }
89
90 @Override
91 public void endStream(final List<? extends Header> trailers) throws IOException {
92 outputChannel.complete(trailers);
93 requestState = MessageState.COMPLETE;
94 }
95
96 @Override
97 public int write(final ByteBuffer src) throws IOException {
98 return outputChannel.write(src);
99 }
100
101 @Override
102 public void endStream() throws IOException {
103 endStream(null);
104 }
105
106 };
107
108 this.httpProcessor = httpProcessor;
109 this.http1Config = http1Config;
110 this.connectionReuseStrategy = connectionReuseStrategy;
111 this.exchangeHandler = exchangeHandler;
112 this.context = context;
113 this.requestCommitted = new AtomicBoolean(false);
114 this.done = new AtomicBoolean(false);
115 this.keepAlive = true;
116 this.requestState = MessageState.IDLE;
117 this.responseState = MessageState.HEADERS;
118 }
119
120 boolean isResponseFinal() {
121 return responseState == MessageState.COMPLETE;
122 }
123
124 boolean isCompleted() {
125 return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
126 }
127
128 String getRequestMethod() {
129 return committedRequest != null ? committedRequest.getMethod() : null;
130 }
131
132 boolean isOutputReady() {
133 switch (requestState) {
134 case IDLE:
135 case ACK:
136 return true;
137 case BODY:
138 return exchangeHandler.available() > 0;
139 default:
140 return false;
141 }
142 }
143
144 private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
145 if (requestCommitted.compareAndSet(false, true)) {
146 final ProtocolVersion transportVersion = request.getVersion();
147 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
148 throw new UnsupportedHttpVersionException(transportVersion);
149 }
150 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
151 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
152
153 httpProcessor.process(request, entityDetails, context);
154
155 final boolean endStream = entityDetails == null;
156 if (endStream) {
157 outputChannel.submit(request, true, FlushMode.IMMEDIATE);
158 committedRequest = request;
159 requestState = MessageState.COMPLETE;
160 } else {
161 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
162 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
163 outputChannel.submit(request, false, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
164 committedRequest = request;
165 if (expectContinue) {
166 requestState = MessageState.ACK;
167 timeout = outputChannel.getSocketTimeout();
168 outputChannel.setSocketTimeout(http1Config.getWaitForContinueTimeout());
169 } else {
170 requestState = MessageState.BODY;
171 exchangeHandler.produce(internalDataChannel);
172 }
173 }
174 } else {
175 throw new HttpException("Request already committed");
176 }
177 }
178
179 void produceOutput() throws HttpException, IOException {
180 switch (requestState) {
181 case IDLE:
182 requestState = MessageState.HEADERS;
183 exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
184 break;
185 case ACK:
186 outputChannel.suspendOutput();
187 break;
188 case BODY:
189 exchangeHandler.produce(internalDataChannel);
190 break;
191 }
192 }
193
194 void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
195 if (done.get() || responseState != MessageState.HEADERS) {
196 throw new ProtocolException("Unexpected message head");
197 }
198 final ProtocolVersion transportVersion = response.getVersion();
199 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
200 throw new UnsupportedHttpVersionException(transportVersion);
201 }
202
203 final int status = response.getCode();
204 if (status < HttpStatus.SC_INFORMATIONAL) {
205 throw new ProtocolException("Invalid response: " + new StatusLine(response));
206 }
207 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
208 exchangeHandler.consumeInformation(response, context);
209 } else {
210 if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
211 keepAlive = false;
212 }
213 }
214 if (requestState == MessageState.ACK) {
215 if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
216 outputChannel.setSocketTimeout(timeout);
217 requestState = MessageState.BODY;
218 if (status < HttpStatus.SC_CLIENT_ERROR) {
219 exchangeHandler.produce(internalDataChannel);
220 }
221 }
222 }
223 if (status < HttpStatus.SC_SUCCESS) {
224 return;
225 }
226 if (requestState == MessageState.BODY) {
227 if (status >= HttpStatus.SC_CLIENT_ERROR) {
228 requestState = MessageState.COMPLETE;
229 if (!outputChannel.abortGracefully()) {
230 keepAlive = false;
231 }
232 }
233 }
234
235 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
236 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
237 httpProcessor.process(response, entityDetails, context);
238
239 if (entityDetails == null && !keepAlive) {
240 outputChannel.close();
241 }
242
243 exchangeHandler.consumeResponse(response, entityDetails, context);
244 if (entityDetails == null) {
245 responseState = MessageState.COMPLETE;
246 } else {
247 responseState = MessageState.BODY;
248 }
249 }
250
251 void consumeData(final ByteBuffer src) throws HttpException, IOException {
252 if (done.get() || responseState != MessageState.BODY) {
253 throw new ProtocolException("Unexpected message data");
254 }
255 exchangeHandler.consume(src);
256 }
257
258 void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
259 exchangeHandler.updateCapacity(capacityChannel);
260 }
261
262 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
263 if (done.get() || responseState != MessageState.BODY) {
264 throw new ProtocolException("Unexpected message data");
265 }
266 if (!keepAlive) {
267 outputChannel.close();
268 }
269 responseState = MessageState.COMPLETE;
270 exchangeHandler.streamEnd(trailers);
271 }
272
273 boolean handleTimeout() {
274 if (requestState == MessageState.ACK) {
275 requestState = MessageState.BODY;
276 outputChannel.setSocketTimeout(timeout);
277 outputChannel.requestOutput();
278 return true;
279 }
280 return false;
281 }
282
283 void failed(final Exception cause) {
284 if (!done.get()) {
285 exchangeHandler.failed(cause);
286 }
287 }
288
289 @Override
290 public void releaseResources() {
291 if (done.compareAndSet(false, true)) {
292 responseState = MessageState.COMPLETE;
293 requestState = MessageState.COMPLETE;
294 exchangeHandler.releaseResources();
295 }
296 }
297
298 void appendState(final StringBuilder buf) {
299 buf.append("requestState=").append(requestState)
300 .append(", responseState=").append(responseState)
301 .append(", responseCommitted=").append(requestCommitted)
302 .append(", keepAlive=").append(keepAlive)
303 .append(", done=").append(done);
304 }
305
306 @Override
307 public String toString() {
308 final StringBuilder buf = new StringBuilder();
309 buf.append("[");
310 appendState(buf);
311 buf.append("]");
312 return buf.toString();
313 }
314
315 }
316