1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.nio.channels.ReadableByteChannel;
36 import java.nio.channels.WritableByteChannel;
37 import java.nio.charset.Charset;
38 import java.nio.charset.CharsetDecoder;
39 import java.nio.charset.CharsetEncoder;
40 import java.nio.charset.CodingErrorAction;
41
42 import org.apache.http.ConnectionClosedException;
43 import org.apache.http.Consts;
44 import org.apache.http.Header;
45 import org.apache.http.HttpConnectionMetrics;
46 import org.apache.http.HttpEntity;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpInetConnection;
49 import org.apache.http.HttpMessage;
50 import org.apache.http.HttpRequest;
51 import org.apache.http.HttpResponse;
52 import org.apache.http.config.MessageConstraints;
53 import org.apache.http.entity.BasicHttpEntity;
54 import org.apache.http.entity.ContentLengthStrategy;
55 import org.apache.http.impl.HttpConnectionMetricsImpl;
56 import org.apache.http.impl.entity.LaxContentLengthStrategy;
57 import org.apache.http.impl.entity.StrictContentLengthStrategy;
58 import org.apache.http.impl.io.HttpTransportMetricsImpl;
59 import org.apache.http.impl.nio.codecs.ChunkDecoder;
60 import org.apache.http.impl.nio.codecs.ChunkEncoder;
61 import org.apache.http.impl.nio.codecs.IdentityDecoder;
62 import org.apache.http.impl.nio.codecs.IdentityEncoder;
63 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
64 import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
65 import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
66 import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
67 import org.apache.http.io.HttpTransportMetrics;
68 import org.apache.http.nio.ContentDecoder;
69 import org.apache.http.nio.ContentEncoder;
70 import org.apache.http.nio.NHttpConnection;
71 import org.apache.http.nio.reactor.EventMask;
72 import org.apache.http.nio.reactor.IOSession;
73 import org.apache.http.nio.reactor.SessionBufferStatus;
74 import org.apache.http.nio.reactor.SessionInputBuffer;
75 import org.apache.http.nio.reactor.SessionOutputBuffer;
76 import org.apache.http.nio.reactor.SocketAccessor;
77 import org.apache.http.nio.util.ByteBufferAllocator;
78 import org.apache.http.params.CoreConnectionPNames;
79 import org.apache.http.params.CoreProtocolPNames;
80 import org.apache.http.params.HttpParams;
81 import org.apache.http.protocol.HTTP;
82 import org.apache.http.protocol.HttpContext;
83 import org.apache.http.util.Args;
84 import org.apache.http.util.CharsetUtils;
85 import org.apache.http.util.NetUtils;
86
87
88
89
90
91
92
93 @SuppressWarnings("deprecation")
94 public class NHttpConnectionBase
95 implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
96
97 protected final ContentLengthStrategy incomingContentStrategy;
98 protected final ContentLengthStrategy outgoingContentStrategy;
99
100 protected final SessionInputBufferImpl inbuf;
101 protected final SessionOutputBufferImpl outbuf;
102 private final int fragmentSizeHint;
103 private final MessageConstraints constraints;
104
105 protected final HttpTransportMetricsImpl inTransportMetrics;
106 protected final HttpTransportMetricsImpl outTransportMetrics;
107 protected final HttpConnectionMetricsImpl connMetrics;
108
109 protected HttpContext context;
110 protected IOSession session;
111 protected SocketAddress remote;
112 protected volatile ContentDecoder contentDecoder;
113 protected volatile boolean hasBufferedInput;
114 protected volatile ContentEncoder contentEncoder;
115 protected volatile boolean hasBufferedOutput;
116 protected volatile HttpRequest request;
117 protected volatile HttpResponse response;
118
119 protected volatile int status;
120
121
122
123
124
125
126
127
128
129
130
131
132 @Deprecated
133 public NHttpConnectionBase(
134 final IOSession session,
135 final ByteBufferAllocator allocator,
136 final HttpParams params) {
137 super();
138 Args.notNull(session, "I/O session");
139 Args.notNull(params, "HTTP params");
140
141 int bufferSize = params.getIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, -1);
142 if (bufferSize <= 0) {
143 bufferSize = 4096;
144 }
145 int lineBufferSize = bufferSize;
146 if (lineBufferSize > 512) {
147 lineBufferSize = 512;
148 }
149
150 CharsetDecoder decoder = null;
151 CharsetEncoder encoder = null;
152 Charset charset = CharsetUtils.lookup(
153 (String) params.getParameter(CoreProtocolPNames.HTTP_ELEMENT_CHARSET));
154 if (charset != null) {
155 charset = Consts.ASCII;
156 decoder = charset.newDecoder();
157 encoder = charset.newEncoder();
158 final CodingErrorAction malformedCharAction = (CodingErrorAction) params.getParameter(
159 CoreProtocolPNames.HTTP_MALFORMED_INPUT_ACTION);
160 final CodingErrorAction unmappableCharAction = (CodingErrorAction) params.getParameter(
161 CoreProtocolPNames.HTTP_UNMAPPABLE_INPUT_ACTION);
162 decoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
163 encoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
164 }
165 this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, decoder, allocator);
166 this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, encoder, allocator);
167 this.fragmentSizeHint = bufferSize;
168 this.constraints = MessageConstraints.DEFAULT;
169
170 this.incomingContentStrategy = createIncomingContentStrategy();
171 this.outgoingContentStrategy = createOutgoingContentStrategy();
172
173 this.inTransportMetrics = createTransportMetrics();
174 this.outTransportMetrics = createTransportMetrics();
175 this.connMetrics = createConnectionMetrics(
176 this.inTransportMetrics,
177 this.outTransportMetrics);
178
179 setSession(session);
180 this.status = ACTIVE;
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 protected NHttpConnectionBase(
206 final IOSession session,
207 final int bufferSize,
208 final int fragmentSizeHint,
209 final ByteBufferAllocator allocator,
210 final CharsetDecoder charDecoder,
211 final CharsetEncoder charEncoder,
212 final MessageConstraints constraints,
213 final ContentLengthStrategy incomingContentStrategy,
214 final ContentLengthStrategy outgoingContentStrategy) {
215 Args.notNull(session, "I/O session");
216 Args.positive(bufferSize, "Buffer size");
217 int lineBufferSize = bufferSize;
218 if (lineBufferSize > 512) {
219 lineBufferSize = 512;
220 }
221 this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, charDecoder, allocator);
222 this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, charEncoder, allocator);
223 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize;
224
225 this.inTransportMetrics = new HttpTransportMetricsImpl();
226 this.outTransportMetrics = new HttpTransportMetricsImpl();
227 this.connMetrics = new HttpConnectionMetricsImpl(this.inTransportMetrics, this.outTransportMetrics);
228 this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
229 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
230 LaxContentLengthStrategy.INSTANCE;
231 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
232 StrictContentLengthStrategy.INSTANCE;
233
234 setSession(session);
235 this.status = ACTIVE;
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 protected NHttpConnectionBase(
259 final IOSession session,
260 final int bufferSize,
261 final int fragmentSizeHint,
262 final ByteBufferAllocator allocator,
263 final CharsetDecoder charDecoder,
264 final CharsetEncoder charEncoder,
265 final ContentLengthStrategy incomingContentStrategy,
266 final ContentLengthStrategy outgoingContentStrategy) {
267 this(session, bufferSize, fragmentSizeHint, allocator, charDecoder, charEncoder,
268 null, incomingContentStrategy, outgoingContentStrategy);
269 }
270
271 private void setSession(final IOSession session) {
272 this.session = session;
273 this.context = new SessionHttpContext(this.session);
274 this.session.setBufferStatus(this);
275 this.remote = this.session.getRemoteAddress();
276 }
277
278
279
280
281
282
283
284 protected void bind(final IOSession session) {
285 Args.notNull(session, "I/O session");
286 setSession(session);
287 }
288
289
290
291
292
293
294 @Deprecated
295 protected ContentLengthStrategy createIncomingContentStrategy() {
296 return new LaxContentLengthStrategy();
297 }
298
299
300
301
302
303
304 @Deprecated
305 protected ContentLengthStrategy createOutgoingContentStrategy() {
306 return new StrictContentLengthStrategy();
307 }
308
309
310
311
312
313
314 @Deprecated
315 protected HttpTransportMetricsImpl createTransportMetrics() {
316 return new HttpTransportMetricsImpl();
317 }
318
319
320
321
322
323
324 @Deprecated
325 protected HttpConnectionMetricsImpl createConnectionMetrics(
326 final HttpTransportMetrics inTransportMetric,
327 final HttpTransportMetrics outTransportMetric) {
328 return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
329 }
330
331 @Override
332 public int getStatus() {
333 return this.status;
334 }
335
336 @Override
337 public HttpContext getContext() {
338 return this.context;
339 }
340
341 @Override
342 public HttpRequest getHttpRequest() {
343 return this.request;
344 }
345
346 @Override
347 public HttpResponse getHttpResponse() {
348 return this.response;
349 }
350
351 @Override
352 public void requestInput() {
353 this.session.setEvent(EventMask.READ);
354 }
355
356 @Override
357 public void requestOutput() {
358 this.session.setEvent(EventMask.WRITE);
359 }
360
361 @Override
362 public void suspendInput() {
363 this.session.clearEvent(EventMask.READ);
364 }
365
366 @Override
367 public void suspendOutput() {
368 synchronized (this.session) {
369 if (!this.outbuf.hasData()) {
370 this.session.clearEvent(EventMask.WRITE);
371 }
372 }
373 }
374
375
376
377
378
379
380
381
382
383
384 protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
385 final BasicHttpEntity entity = new BasicHttpEntity();
386 final long len = this.incomingContentStrategy.determineLength(message);
387 this.contentDecoder = createContentDecoder(
388 len,
389 this.session.channel(),
390 this.inbuf,
391 this.inTransportMetrics);
392 if (len == ContentLengthStrategy.CHUNKED) {
393 entity.setChunked(true);
394 entity.setContentLength(-1);
395 } else if (len == ContentLengthStrategy.IDENTITY) {
396 entity.setChunked(false);
397 entity.setContentLength(-1);
398 } else {
399 entity.setChunked(false);
400 entity.setContentLength(len);
401 }
402
403 final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
404 if (contentTypeHeader != null) {
405 entity.setContentType(contentTypeHeader);
406 }
407 final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
408 if (contentEncodingHeader != null) {
409 entity.setContentEncoding(contentEncodingHeader);
410 }
411 return entity;
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427 protected ContentDecoder createContentDecoder(
428 final long len,
429 final ReadableByteChannel channel,
430 final SessionInputBuffer buffer,
431 final HttpTransportMetricsImpl metrics) {
432 if (len == ContentLengthStrategy.CHUNKED) {
433 return new ChunkDecoder(channel, buffer, this.constraints, metrics);
434 } else if (len == ContentLengthStrategy.IDENTITY) {
435 return new IdentityDecoder(channel, buffer, metrics);
436 } else {
437 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
438 }
439 }
440
441
442
443
444
445
446
447
448 protected void prepareEncoder(final HttpMessage message) throws HttpException {
449 final long len = this.outgoingContentStrategy.determineLength(message);
450 this.contentEncoder = createContentEncoder(
451 len,
452 this.session.channel(),
453 this.outbuf,
454 this.outTransportMetrics);
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 protected ContentEncoder createContentEncoder(
471 final long len,
472 final WritableByteChannel channel,
473 final SessionOutputBuffer buffer,
474 final HttpTransportMetricsImpl metrics) {
475 if (len == ContentLengthStrategy.CHUNKED) {
476 return new ChunkEncoder(channel, buffer, metrics, this.fragmentSizeHint);
477 } else if (len == ContentLengthStrategy.IDENTITY) {
478 return new IdentityEncoder(channel, buffer, metrics, this.fragmentSizeHint);
479 } else {
480 return new LengthDelimitedEncoder(channel, buffer, metrics, len, this.fragmentSizeHint);
481 }
482 }
483
484 @Override
485 public boolean hasBufferedInput() {
486 return this.hasBufferedInput;
487 }
488
489 @Override
490 public boolean hasBufferedOutput() {
491 return this.hasBufferedOutput;
492 }
493
494
495
496
497
498
499
500 protected void assertNotClosed() throws ConnectionClosedException {
501 if (this.status != ACTIVE) {
502 throw new ConnectionClosedException();
503 }
504 }
505
506 @Override
507 public void close() throws IOException {
508 if (this.status != ACTIVE) {
509 return;
510 }
511 this.status = CLOSING;
512 if (this.outbuf.hasData()) {
513 this.session.setEvent(EventMask.WRITE);
514 } else {
515 this.session.close();
516 this.status = CLOSED;
517 }
518 }
519
520 @Override
521 public boolean isOpen() {
522 return this.status == ACTIVE && !this.session.isClosed();
523 }
524
525 @Override
526 public boolean isStale() {
527 return this.session.isClosed();
528 }
529
530 @Override
531 public InetAddress getLocalAddress() {
532 final SocketAddress address = this.session.getLocalAddress();
533 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
534 }
535
536 @Override
537 public int getLocalPort() {
538 final SocketAddress address = this.session.getLocalAddress();
539 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
540 }
541
542 @Override
543 public InetAddress getRemoteAddress() {
544 final SocketAddress address = this.session.getRemoteAddress();
545 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
546 }
547
548 @Override
549 public int getRemotePort() {
550 final SocketAddress address = this.session.getRemoteAddress();
551 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
552 }
553
554 @Override
555 public void setSocketTimeout(final int timeout) {
556 this.session.setSocketTimeout(timeout);
557 }
558
559 @Override
560 public int getSocketTimeout() {
561 return this.session.getSocketTimeout();
562 }
563
564 @Override
565 public void shutdown() throws IOException {
566 this.status = CLOSED;
567 this.session.shutdown();
568 }
569
570 @Override
571 public HttpConnectionMetrics getMetrics() {
572 return this.connMetrics;
573 }
574
575 @Override
576 public String toString() {
577 final SocketAddress remoteAddress = this.session.getRemoteAddress();
578 final SocketAddress localAddress = this.session.getLocalAddress();
579 if (remoteAddress != null && localAddress != null) {
580 final StringBuilder buffer = new StringBuilder();
581 NetUtils.formatAddress(buffer, localAddress);
582 buffer.append("<->");
583 NetUtils.formatAddress(buffer, remoteAddress);
584 return buffer.toString();
585 }
586 return "[Not bound]";
587 }
588
589 @Override
590 public Socket getSocket() {
591 return this.session instanceof SocketAccessor/../../../../org/apache/http/nio/reactor/SocketAccessor.html#SocketAccessor">SocketAccessor ? ((SocketAccessor) this.session).getSocket() : null;
592 }
593
594 }