View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.List;
33  
34  import org.apache.hc.core5.function.Supplier;
35  import org.apache.hc.core5.http.FormattedHeader;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.StreamClosedException;
38  import org.apache.hc.core5.http.io.SessionOutputBuffer;
39  import org.apache.hc.core5.http.message.BasicLineFormatter;
40  import org.apache.hc.core5.util.Args;
41  import org.apache.hc.core5.util.CharArrayBuffer;
42  
43  /**
44   * Implements chunked transfer coding. The content is sent in small chunks.
45   * Entities transferred using this output stream can be of unlimited length.
46   * Writes are buffered to an internal buffer (2048 default size).
47   * <p>
48   * Note that this class NEVER closes the underlying stream, even when close
49   * gets called.  Instead, the stream will be marked as closed and no further
50   * output will be permitted.
51   *
52   *
53   * @since 4.0
54   */
55  public class ChunkedOutputStream extends OutputStream {
56  
57      private final SessionOutputBuffer buffer;
58      private final OutputStream outputStream;
59  
60      private final byte[] cache;
61      private int cachePosition = 0;
62      private boolean wroteLastChunk = false;
63      private boolean closed = false;
64      private final CharArrayBuffer lineBuffer;
65      private final Supplier<List<? extends Header>> trailerSupplier;
66  
67      /**
68       * Default constructor.
69       *
70       * @param buffer Session output buffer
71       * @param outputStream Output stream
72       * @param chunkSizeHint minimal chunk size hint
73       * @param trailerSupplier Trailer supplier. May be {@code null}
74       *
75       * @since 5.0
76       */
77      public ChunkedOutputStream(
78              final SessionOutputBuffer buffer,
79              final OutputStream outputStream,
80              final int chunkSizeHint,
81              final Supplier<List<? extends Header>> trailerSupplier) {
82          super();
83          this.buffer = Args.notNull(buffer, "Session output buffer");
84          this.outputStream = Args.notNull(outputStream, "Output stream");
85          this.cache = new byte[chunkSizeHint > 0 ? chunkSizeHint : 2048];
86          this.lineBuffer = new CharArrayBuffer(32);
87          this.trailerSupplier = trailerSupplier;
88      }
89  
90      /**
91       * Constructor with no trailers.
92       *
93       * @param buffer Session output buffer
94       * @param outputStream Output stream
95       * @param chunkSizeHint minimal chunk size hint
96       */
97      public ChunkedOutputStream(final SessionOutputBuffer buffer, final OutputStream outputStream, final int chunkSizeHint) {
98          this(buffer, outputStream, chunkSizeHint, null);
99      }
100 
101     /**
102      * Writes the cache out onto the underlying stream
103      */
104     private void flushCache() throws IOException {
105         if (this.cachePosition > 0) {
106             this.lineBuffer.clear();
107             this.lineBuffer.append(Integer.toHexString(this.cachePosition));
108             this.buffer.writeLine(this.lineBuffer, this.outputStream);
109             this.buffer.write(this.cache, 0, this.cachePosition, this.outputStream);
110             this.lineBuffer.clear();
111             this.buffer.writeLine(this.lineBuffer, this.outputStream);
112             this.cachePosition = 0;
113         }
114     }
115 
116     /**
117      * Writes the cache and bufferToAppend to the underlying stream
118      * as one large chunk
119      */
120     private void flushCacheWithAppend(final byte[] bufferToAppend, final int off, final int len) throws IOException {
121         this.lineBuffer.clear();
122         this.lineBuffer.append(Integer.toHexString(this.cachePosition + len));
123         this.buffer.writeLine(this.lineBuffer, this.outputStream);
124         this.buffer.write(this.cache, 0, this.cachePosition, this.outputStream);
125         this.buffer.write(bufferToAppend, off, len, this.outputStream);
126         this.lineBuffer.clear();
127         this.buffer.writeLine(this.lineBuffer, this.outputStream);
128         this.cachePosition = 0;
129     }
130 
131     private void writeClosingChunk() throws IOException {
132         // Write the final chunk.
133         this.lineBuffer.clear();
134         this.lineBuffer.append('0');
135         this.buffer.writeLine(this.lineBuffer, this.outputStream);
136         writeTrailers();
137         this.lineBuffer.clear();
138         this.buffer.writeLine(this.lineBuffer, this.outputStream);
139     }
140 
141     private void writeTrailers() throws IOException {
142         final List<? extends Header> trailers = this.trailerSupplier != null ? this.trailerSupplier.get() : null;
143         if (trailers != null) {
144             for (int i = 0; i < trailers.size(); i++) {
145                 final Header header = trailers.get(i);
146                 if (header instanceof FormattedHeader) {
147                     final CharArrayBuffer chbuffer = ((FormattedHeader) header).getBuffer();
148                     this.buffer.writeLine(chbuffer, this.outputStream);
149                 } else {
150                     this.lineBuffer.clear();
151                     BasicLineFormatter.INSTANCE.formatHeader(this.lineBuffer, header);
152                     this.buffer.writeLine(this.lineBuffer, this.outputStream);
153                 }
154             }
155         }
156     }
157 
158     // ----------------------------------------------------------- Public Methods
159     /**
160      * Must be called to ensure the internal cache is flushed and the closing
161      * chunk is written.
162      * @throws IOException in case of an I/O error
163      */
164     public void finish() throws IOException {
165         if (!this.wroteLastChunk) {
166             flushCache();
167             writeClosingChunk();
168             this.wroteLastChunk = true;
169         }
170     }
171 
172     // -------------------------------------------- OutputStream Methods
173     @Override
174     public void write(final int b) throws IOException {
175         if (this.closed) {
176             throw new StreamClosedException();
177         }
178         this.cache[this.cachePosition] = (byte) b;
179         this.cachePosition++;
180         if (this.cachePosition == this.cache.length) {
181             flushCache();
182         }
183     }
184 
185     /**
186      * Writes the array. If the array does not fit within the buffer, it is
187      * not split, but rather written out as one large chunk.
188      */
189     @Override
190     public void write(final byte[] b) throws IOException {
191         write(b, 0, b.length);
192     }
193 
194     /**
195      * Writes the array. If the array does not fit within the buffer, it is
196      * not split, but rather written out as one large chunk.
197      */
198     @Override
199     public void write(final byte[] src, final int off, final int len) throws IOException {
200         if (this.closed) {
201             throw new StreamClosedException();
202         }
203         if (len >= this.cache.length - this.cachePosition) {
204             flushCacheWithAppend(src, off, len);
205         } else {
206             System.arraycopy(src, off, cache, this.cachePosition, len);
207             this.cachePosition += len;
208         }
209     }
210 
211     /**
212      * Flushes the content buffer and the underlying stream.
213      */
214     @Override
215     public void flush() throws IOException {
216         flushCache();
217         this.buffer.flush(this.outputStream);
218     }
219 
220     /**
221      * Finishes writing to the underlying stream, but does NOT close the underlying stream.
222      */
223     @Override
224     public void close() throws IOException {
225         if (!this.closed) {
226             this.closed = true;
227             finish();
228             this.buffer.flush(this.outputStream);
229         }
230     }
231 }