1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.ConnectionReuseStrategy;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HeaderElements;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpHeaders;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.Method;
45 import org.apache.hc.core5.http.MisdirectedRequestException;
46 import org.apache.hc.core5.http.ProtocolException;
47 import org.apache.hc.core5.http.ProtocolVersion;
48 import org.apache.hc.core5.http.UnsupportedHttpVersionException;
49 import org.apache.hc.core5.http.impl.ServerSupport;
50 import org.apache.hc.core5.http.message.BasicHttpResponse;
51 import org.apache.hc.core5.http.nio.AsyncPushProducer;
52 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
53 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
54 import org.apache.hc.core5.http.nio.CapacityChannel;
55 import org.apache.hc.core5.http.nio.DataStreamChannel;
56 import org.apache.hc.core5.http.nio.HandlerFactory;
57 import org.apache.hc.core5.http.nio.ResourceHolder;
58 import org.apache.hc.core5.http.nio.ResponseChannel;
59 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
60 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
61 import org.apache.hc.core5.http.protocol.HttpContext;
62 import org.apache.hc.core5.http.protocol.HttpCoreContext;
63 import org.apache.hc.core5.http.protocol.HttpProcessor;
64
65 class ServerHttp1StreamHandler implements ResourceHolder {
66
67 private final Http1StreamChannel<HttpResponse> outputChannel;
68 private final DataStreamChannel internalDataChannel;
69 private final ResponseChannel responseChannel;
70 private final HttpProcessor httpProcessor;
71 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
72 private final ConnectionReuseStrategy connectionReuseStrategy;
73 private final HttpCoreContext context;
74 private final AtomicBoolean responseCommitted;
75 private final AtomicBoolean done;
76
77 private volatile boolean keepAlive;
78 private volatile AsyncServerExchangeHandler exchangeHandler;
79 private volatile HttpRequest receivedRequest;
80 private volatile MessageState requestState;
81 private volatile MessageState responseState;
82
83 ServerHttp1StreamHandler(
84 final Http1StreamChannel<HttpResponse> outputChannel,
85 final HttpProcessor httpProcessor,
86 final ConnectionReuseStrategy connectionReuseStrategy,
87 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
88 final HttpCoreContext context) {
89 this.outputChannel = outputChannel;
90 this.internalDataChannel = new DataStreamChannel() {
91
92 @Override
93 public void requestOutput() {
94 outputChannel.requestOutput();
95 }
96
97 @Override
98 public void endStream(final List<? extends Header> trailers) throws IOException {
99 outputChannel.complete(trailers);
100 if (!keepAlive) {
101 outputChannel.close();
102 }
103 responseState = MessageState.COMPLETE;
104 }
105
106 @Override
107 public int write(final ByteBuffer src) throws IOException {
108 return outputChannel.write(src);
109 }
110
111 @Override
112 public void endStream() throws IOException {
113 endStream(null);
114 }
115
116 };
117
118 this.responseChannel = new ResponseChannel() {
119
120 @Override
121 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
122 commitInformation(response);
123 }
124
125 @Override
126 public void sendResponse(
127 final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
128 ServerSupport.validateResponse(response, responseEntityDetails);
129 commitResponse(response, responseEntityDetails);
130 }
131
132 @Override
133 public void pushPromise(
134 final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
135 commitPromise();
136 }
137
138 @Override
139 public String toString() {
140 return super.toString() + " " + ServerHttp1StreamHandler.this;
141 }
142
143 };
144
145 this.httpProcessor = httpProcessor;
146 this.connectionReuseStrategy = connectionReuseStrategy;
147 this.exchangeHandlerFactory = exchangeHandlerFactory;
148 this.context = context;
149 this.responseCommitted = new AtomicBoolean(false);
150 this.done = new AtomicBoolean(false);
151 this.keepAlive = true;
152 this.requestState = MessageState.HEADERS;
153 this.responseState = MessageState.IDLE;
154 }
155
156 private void commitResponse(
157 final HttpResponse response,
158 final EntityDetails responseEntityDetails) throws HttpException, IOException {
159 if (responseCommitted.compareAndSet(false, true)) {
160
161 final ProtocolVersion transportVersion = response.getVersion();
162 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
163 throw new UnsupportedHttpVersionException(transportVersion);
164 }
165
166 final int status = response.getCode();
167 if (status < HttpStatus.SC_SUCCESS) {
168 throw new HttpException("Invalid response: " + status);
169 }
170
171 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
172 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
173 httpProcessor.process(response, responseEntityDetails, context);
174
175 final boolean endStream = responseEntityDetails == null ||
176 (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
177
178 if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
179 keepAlive = false;
180 }
181
182 outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
183 if (endStream) {
184 if (!keepAlive) {
185 outputChannel.close();
186 }
187 responseState = MessageState.COMPLETE;
188 } else {
189 responseState = MessageState.BODY;
190 exchangeHandler.produce(internalDataChannel);
191 }
192 } else {
193 throw new HttpException("Response already committed");
194 }
195 }
196
197 private void commitInformation(final HttpResponse response) throws IOException, HttpException {
198 if (responseCommitted.get()) {
199 throw new HttpException("Response already committed");
200 }
201 final int status = response.getCode();
202 if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
203 throw new HttpException("Invalid intermediate response: " + status);
204 }
205 outputChannel.submit(response, true, FlushMode.IMMEDIATE);
206 }
207
208 private void commitPromise() throws HttpException {
209 throw new HttpException("HTTP/1.1 does not support server push");
210 }
211
212 void activateChannel() throws IOException, HttpException {
213 outputChannel.activate();
214 }
215
216 boolean isResponseFinal() {
217 return responseState == MessageState.COMPLETE;
218 }
219
220 boolean keepAlive() {
221 return keepAlive;
222 }
223
224 boolean isCompleted() {
225 return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
226 }
227
228 void terminateExchange(final HttpException ex) throws HttpException, IOException {
229 if (done.get() || requestState != MessageState.HEADERS) {
230 throw new ProtocolException("Unexpected message head");
231 }
232 receivedRequest = null;
233 requestState = MessageState.COMPLETE;
234 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
235 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
236 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
237 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
238 exchangeHandler.handleRequest(null, null, responseChannel, context);
239 }
240
241 void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
242 if (done.get() || requestState != MessageState.HEADERS) {
243 throw new ProtocolException("Unexpected message head");
244 }
245 receivedRequest = request;
246 requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
247
248 AsyncServerExchangeHandler handler;
249 try {
250 handler = exchangeHandlerFactory.create(request, context);
251 } catch (final MisdirectedRequestException ex) {
252 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
253 } catch (final HttpException ex) {
254 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
255 }
256 if (handler == null) {
257 handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
258 }
259
260 exchangeHandler = handler;
261
262 final ProtocolVersion transportVersion = request.getVersion();
263 if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
264 throw new UnsupportedHttpVersionException(transportVersion);
265 }
266 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
267 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
268
269 try {
270 httpProcessor.process(request, requestEntityDetails, context);
271 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
272 } catch (final HttpException ex) {
273 if (!responseCommitted.get()) {
274 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
275 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
276 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
277 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
278 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
279 } else {
280 throw ex;
281 }
282 }
283
284 }
285
286 boolean isOutputReady() {
287 switch (responseState) {
288 case BODY:
289 return exchangeHandler.available() > 0;
290 default:
291 return false;
292 }
293 }
294
295 void produceOutput() throws HttpException, IOException {
296 switch (responseState) {
297 case BODY:
298 exchangeHandler.produce(internalDataChannel);
299 break;
300 }
301 }
302
303 void consumeData(final ByteBuffer src) throws HttpException, IOException {
304 if (done.get() || requestState != MessageState.BODY) {
305 throw new ProtocolException("Unexpected message data");
306 }
307 if (responseState == MessageState.ACK) {
308 outputChannel.requestOutput();
309 }
310 exchangeHandler.consume(src);
311 }
312
313 void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
314 exchangeHandler.updateCapacity(capacityChannel);
315 }
316
317 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
318 if (done.get() || requestState != MessageState.BODY) {
319 throw new ProtocolException("Unexpected message data");
320 }
321 requestState = MessageState.COMPLETE;
322 exchangeHandler.streamEnd(trailers);
323 }
324
325 void failed(final Exception cause) {
326 if (!done.get()) {
327 exchangeHandler.failed(cause);
328 }
329 }
330
331 @Override
332 public void releaseResources() {
333 if (done.compareAndSet(false, true)) {
334 requestState = MessageState.COMPLETE;
335 responseState = MessageState.COMPLETE;
336 exchangeHandler.releaseResources();
337 }
338 }
339
340 void appendState(final StringBuilder buf) {
341 buf.append("requestState=").append(requestState)
342 .append(", responseState=").append(responseState)
343 .append(", responseCommitted=").append(responseCommitted)
344 .append(", keepAlive=").append(keepAlive)
345 .append(", done=").append(done);
346 }
347
348 @Override
349 public String toString() {
350 final StringBuilder buf = new StringBuilder();
351 buf.append("[");
352 appendState(buf);
353 buf.append("]");
354 return buf.toString();
355 }
356
357 }
358