1 /* 2 * ==================================================================== 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * ==================================================================== 20 * 21 * This software consists of voluntary contributions made by many 22 * individuals on behalf of the Apache Software Foundation. For more 23 * information on the Apache Software Foundation, please see 24 * <http://www.apache.org/>. 25 * 26 */ 27 28 package org.apache.hc.core5.http.impl.io; 29 30 import java.io.IOException; 31 import java.io.InputStream; 32 33 import org.apache.hc.core5.http.ConnectionClosedException; 34 import org.apache.hc.core5.http.StreamClosedException; 35 import org.apache.hc.core5.http.io.SessionInputBuffer; 36 import org.apache.hc.core5.util.Args; 37 38 /** 39 * Input stream that cuts off after a defined number of bytes. This class 40 * is used to receive content of HTTP messages where the end of the content 41 * entity is determined by the value of the {@code Content-Length header}. 42 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE} 43 * long. 44 * <p> 45 * Note that this class NEVER closes the underlying stream, even when 46 * {@link #close()} gets called. Instead, it will read until the "end" of 47 * its limit on close, which allows for the seamless execution of subsequent 48 * HTTP 1.1 requests, while not requiring the client to remember to read the 49 * entire contents of the response. 50 * 51 * 52 * @since 4.0 53 */ 54 public class ContentLengthInputStream extends InputStream { 55 56 private static final int BUFFER_SIZE = 2048; 57 58 private final SessionInputBuffer buffer; 59 private final InputStream inputStream; 60 61 /** 62 * The maximum number of bytes that can be read from the stream. Subsequent 63 * read operations will return -1. 64 */ 65 private final long contentLength; 66 67 /** The current position */ 68 private long pos; 69 70 /** True if the stream is closed. */ 71 private boolean closed; 72 73 /** 74 * Default constructor. 75 * 76 * @param buffer Session input buffer 77 * @param inputStream Input stream 78 * @param contentLength The maximum number of bytes that can be read from 79 * the stream. Subsequent read operations will return -1. 80 */ 81 public ContentLengthInputStream(final SessionInputBuffer buffer, final InputStream inputStream, final long contentLength) { 82 super(); 83 this.buffer = Args.notNull(buffer, "Session input buffer"); 84 this.inputStream = Args.notNull(inputStream, "Input stream"); 85 this.contentLength = Args.notNegative(contentLength, "Content length"); 86 } 87 88 /** 89 * <p>Reads until the end of the known length of content.</p> 90 * 91 * <p>Does NOT close the underlying stream, but instead leaves it 92 * primed to parse the next response.</p> 93 * @throws IOException If an IO problem occurs. 94 */ 95 @Override 96 public void close() throws IOException { 97 if (!closed) { 98 try { 99 if (pos < contentLength) { 100 final byte[] buffer = new byte[BUFFER_SIZE]; 101 while (read(buffer) >= 0) { 102 // keep reading 103 } 104 } 105 } finally { 106 // close after above so that we don't throw an exception trying 107 // to read after closed! 108 closed = true; 109 } 110 } 111 } 112 113 @Override 114 public int available() throws IOException { 115 final int len = this.buffer.length(); 116 return Math.min(len, (int) (this.contentLength - this.pos)); 117 } 118 119 /** 120 * Read the next byte from the stream 121 * @return The next byte or -1 if the end of stream has been reached. 122 * @throws IOException If an IO problem occurs 123 * @see java.io.InputStream#read() 124 */ 125 @Override 126 public int read() throws IOException { 127 if (closed) { 128 throw new StreamClosedException(); 129 } 130 131 if (pos >= contentLength) { 132 return -1; 133 } 134 final int b = this.buffer.read(this.inputStream); 135 if (b == -1) { 136 if (pos < contentLength) { 137 throw new ConnectionClosedException( 138 "Premature end of Content-Length delimited message body (expected: %d; received: %d)", 139 contentLength, pos); 140 } 141 } else { 142 pos++; 143 } 144 return b; 145 } 146 147 /** 148 * Does standard {@link InputStream#read(byte[], int, int)} behavior, but 149 * also notifies the watcher when the contents have been consumed. 150 * 151 * @param b The byte array to fill. 152 * @param off Start filling at this position. 153 * @param len The number of bytes to attempt to read. 154 * @return The number of bytes read, or -1 if the end of content has been 155 * reached. 156 * 157 * @throws java.io.IOException Should an error occur on the wrapped stream. 158 */ 159 @Override 160 public int read(final byte[] b, final int off, final int len) throws java.io.IOException { 161 if (closed) { 162 throw new StreamClosedException(); 163 } 164 165 if (pos >= contentLength) { 166 return -1; 167 } 168 169 int chunk = len; 170 if (pos + len > contentLength) { 171 chunk = (int) (contentLength - pos); 172 } 173 final int count = this.buffer.read(b, off, chunk, this.inputStream); 174 if (count == -1 && pos < contentLength) { 175 throw new ConnectionClosedException( 176 "Premature end of Content-Length delimited message body (expected: %d; received: %d)", 177 contentLength, pos); 178 } 179 if (count > 0) { 180 pos += count; 181 } 182 return count; 183 } 184 185 186 /** 187 * Read more bytes from the stream. 188 * @param b The byte array to put the new data in. 189 * @return The number of bytes read into the buffer. 190 * @throws IOException If an IO problem occurs 191 * @see java.io.InputStream#read(byte[]) 192 */ 193 @Override 194 public int read(final byte[] b) throws IOException { 195 return read(b, 0, b.length); 196 } 197 198 /** 199 * Skips and discards a number of bytes from the input stream. 200 * @param n The number of bytes to skip. 201 * @return The actual number of bytes skipped. ≤ 0 if no bytes 202 * are skipped. 203 * @throws IOException If an error occurs while skipping bytes. 204 * @see InputStream#skip(long) 205 */ 206 @Override 207 public long skip(final long n) throws IOException { 208 if (n <= 0) { 209 return 0; 210 } 211 final byte[] buffer = new byte[BUFFER_SIZE]; 212 // make sure we don't skip more bytes than are 213 // still available 214 long remaining = Math.min(n, this.contentLength - this.pos); 215 // skip and keep track of the bytes actually skipped 216 long count = 0; 217 while (remaining > 0) { 218 final int readLen = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining)); 219 if (readLen == -1) { 220 break; 221 } 222 count += readLen; 223 remaining -= readLen; 224 } 225 return count; 226 } 227 }