View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.codecs;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.FileChannel;
33  import java.nio.channels.WritableByteChannel;
34  
35  import org.apache.http.impl.io.HttpTransportMetricsImpl;
36  import org.apache.http.nio.FileContentEncoder;
37  import org.apache.http.nio.reactor.SessionOutputBuffer;
38  import org.apache.http.util.Args;
39  
40  /**
41   * Content encoder that cuts off after a defined number of bytes. This class
42   * is used to send content of HTTP messages where the end of the content entity
43   * is determined by the value of the {@code Content-Length header}.
44   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
45   * long.
46   * <p>
47   * This decoder is optimized to transfer data directly from
48   * a {@link FileChannel} to the underlying I/O session's channel whenever
49   * possible avoiding intermediate buffering in the session buffer.
50   *
51   * @since 4.0
52   */
53  public class LengthDelimitedEncoder extends AbstractContentEncoder
54          implements FileContentEncoder {
55  
56      private final long contentLength;
57      private final int fragHint;
58  
59      private long remaining;
60  
61      /**
62       * @since 4.3
63       *
64       * @param channel underlying channel.
65       * @param buffer  session buffer.
66       * @param metrics transport metrics.
67       * @param contentLength content length.
68       * @param fragementSizeHint fragment size hint defining an minimal size of a fragment
69       *   that should be written out directly to the channel bypassing the session buffer.
70       *   Value {@code 0} disables fragment buffering.
71       */
72      public LengthDelimitedEncoder(
73              final WritableByteChannel channel,
74              final SessionOutputBuffer buffer,
75              final HttpTransportMetricsImpl metrics,
76              final long contentLength,
77              final int fragementSizeHint) {
78          super(channel, buffer, metrics);
79          Args.notNegative(contentLength, "Content length");
80          this.contentLength = contentLength;
81          this.fragHint = fragementSizeHint > 0 ? fragementSizeHint : 0;
82          this.remaining = contentLength;
83      }
84  
85      public LengthDelimitedEncoder(
86              final WritableByteChannel channel,
87              final SessionOutputBuffer buffer,
88              final HttpTransportMetricsImpl metrics,
89              final long contentLength) {
90          this(channel, buffer, metrics, contentLength, 0);
91      }
92  
93      private int nextChunk(final ByteBuffer src) {
94          return (int) Math.min(Math.min(this.remaining, Integer.MAX_VALUE), src.remaining());
95      }
96  
97      @Override
98      public int write(final ByteBuffer src) throws IOException {
99          if (src == null) {
100             return 0;
101         }
102         assertNotCompleted();
103 
104         int total = 0;
105         while (src.hasRemaining() && this.remaining > 0) {
106             if (this.buffer.hasData() || this.fragHint > 0) {
107                 final int chunk = nextChunk(src);
108                 if (chunk <= this.fragHint) {
109                     final int capacity = this.fragHint - this.buffer.length();
110                     if (capacity > 0) {
111                         final int limit = Math.min(capacity, chunk);
112                         final int bytesWritten = writeToBuffer(src, limit);
113                         this.remaining -= bytesWritten;
114                         total += bytesWritten;
115                     }
116                 }
117             }
118             if (this.buffer.hasData()) {
119                 final int chunk = nextChunk(src);
120                 if (this.buffer.length() >= this.fragHint || chunk > 0) {
121                     final int bytesWritten = flushToChannel();
122                     if (bytesWritten == 0) {
123                         break;
124                     }
125                 }
126             }
127             if (!this.buffer.hasData()) {
128                 final int chunk = nextChunk(src);
129                 if (chunk > this.fragHint) {
130                     final int bytesWritten = writeToChannel(src, chunk);
131                     this.remaining -= bytesWritten;
132                     total += bytesWritten;
133                     if (bytesWritten == 0) {
134                         break;
135                     }
136                 }
137             }
138         }
139         if (this.remaining <= 0) {
140             super.complete();
141         }
142         return total;
143     }
144 
145     @Override
146     public long transfer(
147             final FileChannel src,
148             final long position,
149             final long count) throws IOException {
150 
151         if (src == null) {
152             return 0;
153         }
154         assertNotCompleted();
155 
156         flushToChannel();
157         if (this.buffer.hasData()) {
158             return 0;
159         }
160 
161         final long chunk = Math.min(this.remaining, count);
162         final long bytesWritten = src.transferTo(position, chunk, this.channel);
163         if (bytesWritten > 0) {
164             this.metrics.incrementBytesTransferred(bytesWritten);
165         }
166         this.remaining -= bytesWritten;
167         if (this.remaining <= 0) {
168             super.complete();
169         }
170         return bytesWritten;
171     }
172 
173     @Override
174     public String toString() {
175         final StringBuilder sb = new StringBuilder();
176         sb.append("[content length: ");
177         sb.append(this.contentLength);
178         sb.append("; pos: ");
179         sb.append(this.contentLength - this.remaining);
180         sb.append("; completed: ");
181         sb.append(isCompleted());
182         sb.append("]");
183         return sb.toString();
184     }
185 
186 }