1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.http.ConnectionClosedException;
40 import org.apache.hc.core5.http.ConnectionReuseStrategy;
41 import org.apache.hc.core5.http.ContentLengthStrategy;
42 import org.apache.hc.core5.http.EntityDetails;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.LengthRequiredException;
49 import org.apache.hc.core5.http.config.CharCodingConfig;
50 import org.apache.hc.core5.http.config.Http1Config;
51 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
52 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
53 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
54 import org.apache.hc.core5.http.impl.Http1StreamListener;
55 import org.apache.hc.core5.http.message.MessageSupport;
56 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57 import org.apache.hc.core5.http.nio.CapacityChannel;
58 import org.apache.hc.core5.http.nio.ContentDecoder;
59 import org.apache.hc.core5.http.nio.ContentEncoder;
60 import org.apache.hc.core5.http.nio.NHttpMessageParser;
61 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
62 import org.apache.hc.core5.http.nio.SessionInputBuffer;
63 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
64 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65 import org.apache.hc.core5.http.protocol.HttpCoreContext;
66 import org.apache.hc.core5.http.protocol.HttpProcessor;
67 import org.apache.hc.core5.io.CloseMode;
68 import org.apache.hc.core5.reactor.ProtocolIOSession;
69 import org.apache.hc.core5.util.Args;
70 import org.apache.hc.core5.util.Asserts;
71 import org.apache.hc.core5.util.Timeout;
72
73
74
75
76
77
78
79
80 @Internal
81 public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
82
83 private final HttpProcessor httpProcessor;
84 private final ConnectionReuseStrategy connectionReuseStrategy;
85 private final Http1Config http1Config;
86 private final Http1StreamListener streamListener;
87 private final Queue<ClientHttp1StreamHandler> pipeline;
88 private final Http1StreamChannel<HttpRequest> outputChannel;
89
90 private volatile ClientHttp1StreamHandler outgoing;
91 private volatile ClientHttp1StreamHandler incoming;
92
93 public ClientHttp1StreamDuplexer(
94 final ProtocolIOSession ioSession,
95 final HttpProcessor httpProcessor,
96 final Http1Config http1Config,
97 final CharCodingConfig charCodingConfig,
98 final ConnectionReuseStrategy connectionReuseStrategy,
99 final NHttpMessageParser<HttpResponse> incomingMessageParser,
100 final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
101 final ContentLengthStrategy incomingContentStrategy,
102 final ContentLengthStrategy outgoingContentStrategy,
103 final Http1StreamListener streamListener) {
104 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
105 incomingContentStrategy, outgoingContentStrategy);
106 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
107 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
108 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
109 DefaultConnectionReuseStrategy.INSTANCE;
110 this.streamListener = streamListener;
111 this.pipeline = new ConcurrentLinkedQueue<>();
112 this.outputChannel = new Http1StreamChannel<HttpRequest>() {
113
114 @Override
115 public void close() {
116 shutdownSession(CloseMode.IMMEDIATE);
117 }
118
119 @Override
120 public void submit(
121 final HttpRequest request,
122 final boolean endStream,
123 final FlushMode flushMode) throws HttpException, IOException {
124 if (streamListener != null) {
125 streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
126 }
127 commitMessageHead(request, endStream, flushMode);
128 }
129
130 @Override
131 public void suspendOutput() throws IOException {
132 suspendSessionOutput();
133 }
134
135 @Override
136 public void requestOutput() {
137 requestSessionOutput();
138 }
139
140 @Override
141 public Timeout getSocketTimeout() {
142 return getSessionTimeout();
143 }
144
145 @Override
146 public void setSocketTimeout(final Timeout timeout) {
147 setSessionTimeout(timeout);
148 }
149
150 @Override
151 public int write(final ByteBuffer src) throws IOException {
152 return streamOutput(src);
153 }
154
155 @Override
156 public void complete(final List<? extends Header> trailers) throws IOException {
157 endOutputStream(trailers);
158 }
159
160 @Override
161 public boolean isCompleted() {
162 return isOutputCompleted();
163 }
164
165 @Override
166 public boolean abortGracefully() throws IOException {
167 final MessageDelineation messageDelineation = endOutputStream(null);
168 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
169 }
170
171 @Override
172 public void activate() throws HttpException, IOException {
173 }
174
175 };
176 }
177
178 @Override
179 void terminate(final Exception exception) {
180 if (incoming != null) {
181 incoming.failed(exception);
182 incoming.releaseResources();
183 incoming = null;
184 }
185 if (outgoing != null) {
186 outgoing.failed(exception);
187 outgoing.releaseResources();
188 outgoing = null;
189 }
190 for (;;) {
191 final ClientHttp1StreamHandler handler = pipeline.poll();
192 if (handler != null) {
193 handler.failed(exception);
194 handler.releaseResources();
195 } else {
196 break;
197 }
198 }
199 }
200
201 @Override
202 void disconnected() {
203 if (incoming != null) {
204 if (!incoming.isCompleted()) {
205 incoming.failed(new ConnectionClosedException());
206 }
207 incoming.releaseResources();
208 incoming = null;
209 }
210 if (outgoing != null) {
211 if (!outgoing.isCompleted()) {
212 outgoing.failed(new ConnectionClosedException());
213 }
214 outgoing.releaseResources();
215 outgoing = null;
216 }
217 for (;;) {
218 final ClientHttp1StreamHandler handler = pipeline.poll();
219 if (handler != null) {
220 handler.failed(new ConnectionClosedException());
221 handler.releaseResources();
222 } else {
223 break;
224 }
225 }
226 }
227
228 @Override
229 void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
230 if (response.getCode() >= HttpStatus.SC_OK) {
231 connMetrics.incrementRequestCount();
232 }
233 }
234
235 @Override
236 void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
237 connMetrics.incrementRequestCount();
238 }
239
240 @Override
241 protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
242
243 if (incoming == null) {
244 incoming = pipeline.poll();
245 }
246 if (incoming == null) {
247 throw new HttpException("Unexpected response");
248 }
249 return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
250 }
251
252 @Override
253 protected ContentDecoder createContentDecoder(
254 final long len,
255 final ReadableByteChannel channel,
256 final SessionInputBuffer buffer,
257 final BasicHttpTransportMetrics metrics) throws HttpException {
258
259 if (len >= 0) {
260 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
261 } else if (len == ContentLengthStrategy.CHUNKED) {
262 return new ChunkDecoder(channel, buffer, http1Config, metrics);
263 } else {
264 return new IdentityDecoder(channel, buffer, metrics);
265 }
266 }
267
268 @Override
269 protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
270 return true;
271 }
272
273 @Override
274 protected ContentEncoder createContentEncoder(
275 final long len,
276 final WritableByteChannel channel,
277 final SessionOutputBuffer buffer,
278 final BasicHttpTransportMetrics metrics) throws HttpException {
279 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
280 if (len >= 0) {
281 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
282 } else if (len == ContentLengthStrategy.CHUNKED) {
283 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
284 } else {
285 throw new LengthRequiredException();
286 }
287 }
288
289 @Override
290 boolean inputIdle() {
291 return incoming == null;
292 }
293
294 @Override
295 boolean outputIdle() {
296 return outgoing == null && pipeline.isEmpty();
297 }
298
299 @Override
300 void outputEnd() throws HttpException, IOException {
301 if (outgoing != null) {
302 if (outgoing.isCompleted()) {
303 outgoing.releaseResources();
304 }
305 outgoing = null;
306 }
307 }
308
309 @Override
310 void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
311 final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
312 final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
313 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
314 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
315 final ClientHttp1StreamHandlerntHttp1StreamHandler.html#ClientHttp1StreamHandler">ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
316 outputChannel,
317 httpProcessor,
318 http1Config,
319 connectionReuseStrategy,
320 exchangeHandler,
321 context);
322 pipeline.add(handler);
323 outgoing = handler;
324
325 if (handler.isOutputReady()) {
326 handler.produceOutput();
327 }
328 }
329
330 @Override
331 boolean isOutputReady() {
332 return outgoing != null && outgoing.isOutputReady();
333 }
334
335 @Override
336 void produceOutput() throws HttpException, IOException {
337 if (outgoing != null) {
338 outgoing.produceOutput();
339 }
340 }
341
342 @Override
343 void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
344 if (streamListener != null) {
345 streamListener.onResponseHead(this, response);
346 }
347 Asserts.notNull(incoming, "Response stream handler");
348 incoming.consumeHeader(response, entityDetails);
349 }
350
351 @Override
352 void consumeData(final ByteBuffer src) throws HttpException, IOException {
353 Asserts.notNull(incoming, "Response stream handler");
354 incoming.consumeData(src);
355 }
356
357 @Override
358 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
359 Asserts.notNull(incoming, "Response stream handler");
360 incoming.updateCapacity(capacityChannel);
361 }
362
363 @Override
364 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
365 Asserts.notNull(incoming, "Response stream handler");
366 incoming.dataEnd(trailers);
367 }
368
369 @Override
370 void inputEnd() throws HttpException, IOException {
371 if (incoming != null && incoming.isResponseFinal()) {
372 if (streamListener != null) {
373 streamListener.onExchangeComplete(this, isOpen());
374 }
375 if (incoming.isCompleted()) {
376 incoming.releaseResources();
377 }
378 incoming = null;
379 }
380 }
381
382 @Override
383 boolean handleTimeout() {
384 return outgoing != null && outgoing.handleTimeout();
385 }
386
387 @Override
388 void appendState(final StringBuilder buf) {
389 super.appendState(buf);
390 super.appendState(buf);
391 buf.append(", incoming=[");
392 if (incoming != null) {
393 incoming.appendState(buf);
394 }
395 buf.append("], outgoing=[");
396 if (outgoing != null) {
397 outgoing.appendState(buf);
398 }
399 buf.append("], pipeline=");
400 buf.append(pipeline.size());
401 }
402
403 @Override
404 public String toString() {
405 final StringBuilder buf = new StringBuilder();
406 buf.append("[");
407 appendState(buf);
408 buf.append("]");
409 return buf.toString();
410 }
411
412 }