1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.http.ConnectionClosedException;
40 import org.apache.hc.core5.http.ConnectionReuseStrategy;
41 import org.apache.hc.core5.http.ContentLengthStrategy;
42 import org.apache.hc.core5.http.EntityDetails;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.LengthRequiredException;
49 import org.apache.hc.core5.http.config.CharCodingConfig;
50 import org.apache.hc.core5.http.config.Http1Config;
51 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
52 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
53 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
54 import org.apache.hc.core5.http.impl.Http1StreamListener;
55 import org.apache.hc.core5.http.message.MessageSupport;
56 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57 import org.apache.hc.core5.http.nio.CapacityChannel;
58 import org.apache.hc.core5.http.nio.ContentDecoder;
59 import org.apache.hc.core5.http.nio.ContentEncoder;
60 import org.apache.hc.core5.http.nio.NHttpMessageParser;
61 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
62 import org.apache.hc.core5.http.nio.SessionInputBuffer;
63 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
64 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65 import org.apache.hc.core5.http.protocol.HttpCoreContext;
66 import org.apache.hc.core5.http.protocol.HttpProcessor;
67 import org.apache.hc.core5.io.CloseMode;
68 import org.apache.hc.core5.reactor.ProtocolIOSession;
69 import org.apache.hc.core5.util.Args;
70 import org.apache.hc.core5.util.Asserts;
71 import org.apache.hc.core5.util.Timeout;
72
73
74
75
76
77
78
79
80 @Internal
81 public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
82
83 private final HttpProcessor httpProcessor;
84 private final ConnectionReuseStrategy connectionReuseStrategy;
85 private final Http1Config http1Config;
86 private final Http1StreamListener streamListener;
87 private final Queue<ClientHttp1StreamHandler> pipeline;
88 private final Http1StreamChannel<HttpRequest> outputChannel;
89
90 private volatile ClientHttp1StreamHandler outgoing;
91 private volatile ClientHttp1StreamHandler incoming;
92
93 public ClientHttp1StreamDuplexer(
94 final ProtocolIOSession ioSession,
95 final HttpProcessor httpProcessor,
96 final Http1Config http1Config,
97 final CharCodingConfig charCodingConfig,
98 final ConnectionReuseStrategy connectionReuseStrategy,
99 final NHttpMessageParser<HttpResponse> incomingMessageParser,
100 final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
101 final ContentLengthStrategy incomingContentStrategy,
102 final ContentLengthStrategy outgoingContentStrategy,
103 final Http1StreamListener streamListener) {
104 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
105 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
106 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
107 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
108 DefaultConnectionReuseStrategy.INSTANCE;
109 this.streamListener = streamListener;
110 this.pipeline = new ConcurrentLinkedQueue<>();
111 this.outputChannel = new Http1StreamChannel<HttpRequest>() {
112
113 @Override
114 public void close() {
115 shutdownSession(CloseMode.IMMEDIATE);
116 }
117
118 @Override
119 public void submit(
120 final HttpRequest request,
121 final boolean endStream,
122 final FlushMode flushMode) throws HttpException, IOException {
123 if (streamListener != null) {
124 streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
125 }
126 commitMessageHead(request, endStream, flushMode);
127 }
128
129 @Override
130 public void suspendOutput() throws IOException {
131 suspendSessionOutput();
132 }
133
134 @Override
135 public void requestOutput() {
136 requestSessionOutput();
137 }
138
139 @Override
140 public Timeout getSocketTimeout() {
141 return getSessionTimeout();
142 }
143
144 @Override
145 public void setSocketTimeout(final Timeout timeout) {
146 setSessionTimeout(timeout);
147 }
148
149 @Override
150 public int write(final ByteBuffer src) throws IOException {
151 return streamOutput(src);
152 }
153
154 @Override
155 public void complete(final List<? extends Header> trailers) throws IOException {
156 endOutputStream(trailers);
157 }
158
159 @Override
160 public boolean isCompleted() {
161 return isOutputCompleted();
162 }
163
164 @Override
165 public boolean abortGracefully() throws IOException {
166 final MessageDelineation messageDelineation = endOutputStream(null);
167 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
168 }
169
170 @Override
171 public void activate() throws HttpException, IOException {
172 }
173
174 };
175 }
176
177 @Override
178 void terminate(final Exception exception) {
179 if (incoming != null) {
180 incoming.failed(exception);
181 incoming.releaseResources();
182 incoming = null;
183 }
184 if (outgoing != null) {
185 outgoing.failed(exception);
186 outgoing.releaseResources();
187 outgoing = null;
188 }
189 for (;;) {
190 final ClientHttp1StreamHandler handler = pipeline.poll();
191 if (handler != null) {
192 handler.failed(exception);
193 handler.releaseResources();
194 } else {
195 break;
196 }
197 }
198 }
199
200 @Override
201 void disconnected() {
202 if (incoming != null) {
203 if (!incoming.isCompleted()) {
204 incoming.failed(new ConnectionClosedException());
205 }
206 incoming.releaseResources();
207 incoming = null;
208 }
209 if (outgoing != null) {
210 if (!outgoing.isCompleted()) {
211 outgoing.failed(new ConnectionClosedException());
212 }
213 outgoing.releaseResources();
214 outgoing = null;
215 }
216 for (;;) {
217 final ClientHttp1StreamHandler handler = pipeline.poll();
218 if (handler != null) {
219 handler.failed(new ConnectionClosedException());
220 handler.releaseResources();
221 } else {
222 break;
223 }
224 }
225 }
226
227 @Override
228 void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
229 if (response.getCode() >= HttpStatus.SC_OK) {
230 connMetrics.incrementRequestCount();
231 }
232 }
233
234 @Override
235 void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
236 connMetrics.incrementRequestCount();
237 }
238
239 @Override
240 protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
241
242 if (incoming == null) {
243 incoming = pipeline.poll();
244 }
245 if (incoming == null) {
246 throw new HttpException("Unexpected response");
247 }
248 return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
249 }
250
251 @Override
252 protected ContentDecoder createContentDecoder(
253 final long len,
254 final ReadableByteChannel channel,
255 final SessionInputBuffer buffer,
256 final BasicHttpTransportMetrics metrics) throws HttpException {
257
258 if (len >= 0) {
259 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
260 } else if (len == ContentLengthStrategy.CHUNKED) {
261 return new ChunkDecoder(channel, buffer, http1Config, metrics);
262 } else {
263 return new IdentityDecoder(channel, buffer, metrics);
264 }
265 }
266
267 @Override
268 protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
269 return true;
270 }
271
272 @Override
273 protected ContentEncoder createContentEncoder(
274 final long len,
275 final WritableByteChannel channel,
276 final SessionOutputBuffer buffer,
277 final BasicHttpTransportMetrics metrics) throws HttpException {
278 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
279 if (len >= 0) {
280 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
281 } else if (len == ContentLengthStrategy.CHUNKED) {
282 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
283 } else {
284 throw new LengthRequiredException();
285 }
286 }
287
288 @Override
289 boolean inputIdle() {
290 return incoming == null;
291 }
292
293 @Override
294 boolean outputIdle() {
295 return outgoing == null && pipeline.isEmpty();
296 }
297
298 @Override
299 void outputEnd() throws HttpException, IOException {
300 if (outgoing != null) {
301 if (outgoing.isCompleted()) {
302 outgoing.releaseResources();
303 }
304 outgoing = null;
305 }
306 }
307
308 @Override
309 void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
310 final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
311 final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
312 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
313 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
314 final ClientHttp1StreamHandlerntHttp1StreamHandler.html#ClientHttp1StreamHandler">ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
315 outputChannel,
316 httpProcessor,
317 http1Config,
318 connectionReuseStrategy,
319 exchangeHandler,
320 context);
321 pipeline.add(handler);
322 outgoing = handler;
323
324 if (handler.isOutputReady()) {
325 handler.produceOutput();
326 }
327 }
328
329 @Override
330 boolean isOutputReady() {
331 return outgoing != null && outgoing.isOutputReady();
332 }
333
334 @Override
335 void produceOutput() throws HttpException, IOException {
336 if (outgoing != null) {
337 outgoing.produceOutput();
338 }
339 }
340
341 @Override
342 void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
343 if (streamListener != null) {
344 streamListener.onResponseHead(this, response);
345 }
346 Asserts.notNull(incoming, "Response stream handler");
347 incoming.consumeHeader(response, entityDetails);
348 }
349
350 @Override
351 void consumeData(final ByteBuffer src) throws HttpException, IOException {
352 Asserts.notNull(incoming, "Response stream handler");
353 incoming.consumeData(src);
354 }
355
356 @Override
357 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
358 Asserts.notNull(incoming, "Response stream handler");
359 incoming.updateCapacity(capacityChannel);
360 }
361
362 @Override
363 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
364 Asserts.notNull(incoming, "Response stream handler");
365 incoming.dataEnd(trailers);
366 }
367
368 @Override
369 void inputEnd() throws HttpException, IOException {
370 if (incoming != null && incoming.isResponseFinal()) {
371 if (streamListener != null) {
372 streamListener.onExchangeComplete(this, isOpen());
373 }
374 if (incoming.isCompleted()) {
375 incoming.releaseResources();
376 }
377 incoming = null;
378 }
379 }
380
381 @Override
382 boolean handleTimeout() {
383 return outgoing != null && outgoing.handleTimeout();
384 }
385
386 @Override
387 void appendState(final StringBuilder buf) {
388 super.appendState(buf);
389 super.appendState(buf);
390 buf.append(", incoming=[");
391 if (incoming != null) {
392 incoming.appendState(buf);
393 }
394 buf.append("], outgoing=[");
395 if (outgoing != null) {
396 outgoing.appendState(buf);
397 }
398 buf.append("], pipeline=");
399 buf.append(pipeline.size());
400 }
401
402 @Override
403 public String toString() {
404 final StringBuilder buf = new StringBuilder();
405 buf.append("[");
406 appendState(buf);
407 buf.append("]");
408 return buf.toString();
409 }
410
411 }