1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.ConnectionReuseStrategy;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HeaderElements;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpHeaders;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.ProtocolException;
45 import org.apache.hc.core5.http.ProtocolVersion;
46 import org.apache.hc.core5.http.UnsupportedHttpVersionException;
47 import org.apache.hc.core5.http.config.Http1Config;
48 import org.apache.hc.core5.http.message.StatusLine;
49 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50 import org.apache.hc.core5.http.nio.CapacityChannel;
51 import org.apache.hc.core5.http.nio.DataStreamChannel;
52 import org.apache.hc.core5.http.nio.RequestChannel;
53 import org.apache.hc.core5.http.nio.ResourceHolder;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.http.protocol.HttpCoreContext;
56 import org.apache.hc.core5.http.protocol.HttpProcessor;
57 import org.apache.hc.core5.util.Timeout;
58
59 class ClientHttp1StreamHandler implements ResourceHolder {
60
61 private final Http1StreamChannel<HttpRequest> outputChannel;
62 private final DataStreamChannel internalDataChannel;
63 private final HttpProcessor httpProcessor;
64 private final Http1Config http1Config;
65 private final ConnectionReuseStrategy connectionReuseStrategy;
66 private final AsyncClientExchangeHandler exchangeHandler;
67 private final HttpCoreContext context;
68 private final AtomicBoolean requestCommitted;
69 private final AtomicBoolean done;
70
71 private volatile boolean keepAlive;
72 private volatile Timeout timeout;
73 private volatile HttpRequest committedRequest;
74 private volatile MessageState requestState;
75 private volatile MessageState responseState;
76
77 ClientHttp1StreamHandler(
78 final Http1StreamChannel<HttpRequest> outputChannel,
79 final HttpProcessor httpProcessor,
80 final Http1Config http1Config,
81 final ConnectionReuseStrategy connectionReuseStrategy,
82 final AsyncClientExchangeHandler exchangeHandler,
83 final HttpCoreContext context) {
84 this.outputChannel = outputChannel;
85 this.internalDataChannel = new DataStreamChannel() {
86
87 @Override
88 public void requestOutput() {
89 outputChannel.requestOutput();
90 }
91
92 @Override
93 public void endStream(final List<? extends Header> trailers) throws IOException {
94 outputChannel.complete(trailers);
95 requestState = MessageState.COMPLETE;
96 }
97
98 @Override
99 public int write(final ByteBuffer src) throws IOException {
100 return outputChannel.write(src);
101 }
102
103 @Override
104 public void endStream() throws IOException {
105 endStream(null);
106 }
107
108 };
109
110 this.httpProcessor = httpProcessor;
111 this.http1Config = http1Config;
112 this.connectionReuseStrategy = connectionReuseStrategy;
113 this.exchangeHandler = exchangeHandler;
114 this.context = context;
115 this.requestCommitted = new AtomicBoolean(false);
116 this.done = new AtomicBoolean(false);
117 this.keepAlive = true;
118 this.requestState = MessageState.IDLE;
119 this.responseState = MessageState.HEADERS;
120 }
121
122 boolean isResponseFinal() {
123 return responseState == MessageState.COMPLETE;
124 }
125
126 boolean isCompleted() {
127 return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
128 }
129
130 String getRequestMethod() {
131 return committedRequest != null ? committedRequest.getMethod() : null;
132 }
133
134 boolean isOutputReady() {
135 switch (requestState) {
136 case IDLE:
137 case ACK:
138 return true;
139 case BODY:
140 return exchangeHandler.available() > 0;
141 default:
142 return false;
143 }
144 }
145
146 private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
147 if (requestCommitted.compareAndSet(false, true)) {
148 final ProtocolVersion transportVersion = request.getVersion();
149 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
150 throw new UnsupportedHttpVersionException(transportVersion);
151 }
152 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
153 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154
155 httpProcessor.process(request, entityDetails, context);
156
157 final boolean endStream = entityDetails == null;
158 if (endStream) {
159 outputChannel.submit(request, endStream, FlushMode.IMMEDIATE);
160 committedRequest = request;
161 requestState = MessageState.COMPLETE;
162 } else {
163 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
164 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
165 outputChannel.submit(request, endStream, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
166 committedRequest = request;
167 if (expectContinue) {
168 requestState = MessageState.ACK;
169 timeout = outputChannel.getSocketTimeout();
170 outputChannel.setSocketTimeout(http1Config.getWaitForContinueTimeout());
171 } else {
172 requestState = MessageState.BODY;
173 exchangeHandler.produce(internalDataChannel);
174 }
175 }
176 } else {
177 throw new HttpException("Request already committed");
178 }
179 }
180
181 void produceOutput() throws HttpException, IOException {
182 switch (requestState) {
183 case IDLE:
184 requestState = MessageState.HEADERS;
185 exchangeHandler.produceRequest(new RequestChannel() {
186
187 @Override
188 public void sendRequest(
189 final HttpRequest request, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
190 commitRequest(request, entityDetails);
191 }
192
193 }, context);
194 break;
195 case ACK:
196 outputChannel.suspendOutput();
197 break;
198 case BODY:
199 exchangeHandler.produce(internalDataChannel);
200 break;
201 }
202 }
203
204 void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
205 if (done.get() || responseState != MessageState.HEADERS) {
206 throw new ProtocolException("Unexpected message head");
207 }
208 final ProtocolVersion transportVersion = response.getVersion();
209 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
210 throw new UnsupportedHttpVersionException(transportVersion);
211 }
212
213 final int status = response.getCode();
214 if (status < HttpStatus.SC_INFORMATIONAL) {
215 throw new ProtocolException("Invalid response: " + new StatusLine(response));
216 }
217 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
218 exchangeHandler.consumeInformation(response, context);
219 } else {
220 if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
221 keepAlive = false;
222 }
223 }
224 if (requestState == MessageState.ACK) {
225 if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
226 outputChannel.setSocketTimeout(timeout);
227 requestState = MessageState.BODY;
228 if (status < HttpStatus.SC_CLIENT_ERROR) {
229 exchangeHandler.produce(internalDataChannel);
230 }
231 }
232 }
233 if (status < HttpStatus.SC_SUCCESS) {
234 return;
235 }
236 if (requestState == MessageState.BODY) {
237 if (status >= HttpStatus.SC_CLIENT_ERROR) {
238 requestState = MessageState.COMPLETE;
239 if (!outputChannel.abortGracefully()) {
240 keepAlive = false;
241 }
242 }
243 }
244
245 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
246 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
247 httpProcessor.process(response, entityDetails, context);
248
249 if (entityDetails == null && !keepAlive) {
250 outputChannel.close();
251 }
252
253 exchangeHandler.consumeResponse(response, entityDetails, context);
254 if (entityDetails == null) {
255 responseState = MessageState.COMPLETE;
256 } else {
257 responseState = MessageState.BODY;
258 }
259 }
260
261 void consumeData(final ByteBuffer src) throws HttpException, IOException {
262 if (done.get() || responseState != MessageState.BODY) {
263 throw new ProtocolException("Unexpected message data");
264 }
265 exchangeHandler.consume(src);
266 }
267
268 void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
269 exchangeHandler.updateCapacity(capacityChannel);
270 }
271
272 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
273 if (done.get() || responseState != MessageState.BODY) {
274 throw new ProtocolException("Unexpected message data");
275 }
276 if (!keepAlive) {
277 outputChannel.close();
278 }
279 responseState = MessageState.COMPLETE;
280 exchangeHandler.streamEnd(trailers);
281 }
282
283 boolean handleTimeout() {
284 if (requestState == MessageState.ACK) {
285 requestState = MessageState.BODY;
286 outputChannel.setSocketTimeout(timeout);
287 outputChannel.requestOutput();
288 return true;
289 }
290 return false;
291 }
292
293 void failed(final Exception cause) {
294 if (!done.get()) {
295 exchangeHandler.failed(cause);
296 }
297 }
298
299 @Override
300 public void releaseResources() {
301 if (done.compareAndSet(false, true)) {
302 responseState = MessageState.COMPLETE;
303 requestState = MessageState.COMPLETE;
304 exchangeHandler.releaseResources();
305 }
306 }
307
308 void appendState(final StringBuilder buf) {
309 buf.append("requestState=").append(requestState)
310 .append(", responseState=").append(responseState)
311 .append(", responseCommitted=").append(requestCommitted)
312 .append(", keepAlive=").append(keepAlive)
313 .append(", done=").append(done);
314 }
315
316 @Override
317 public String toString() {
318 final StringBuilder buf = new StringBuilder();
319 buf.append("[");
320 appendState(buf);
321 buf.append("]");
322 return buf.toString();
323 }
324
325 }
326