1 /*
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.Path;
24 import java.nio.file.StandardOpenOption;
25 import java.util.Objects;
26
27 import org.apache.commons.io.IOUtils;
28 import org.apache.commons.io.build.AbstractStreamBuilder;
29
30 /**
31 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
32 * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
33 * reading a file using NIO, but does not support buffering.
34 * <p>
35 * To build an instance, use {@link Builder}.
36 * </p>
37 * <p>
38 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
39 * </p>
40 *
41 * @see Builder
42 * @since 2.9.0
43 */
44 public final class BufferedFileChannelInputStream extends InputStream {
45
46 // @formatter:off
47 /**
48 * Builds a new {@link BufferedFileChannelInputStream}.
49 *
50 * <p>
51 * Using File IO:
52 * </p>
53 * <pre>{@code
54 * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
55 * .setFile(file)
56 * .setBufferSize(4096)
57 * .get();}
58 * </pre>
59 * <p>
60 * Using NIO Path:
61 * </p>
62 * <pre>{@code
63 * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
64 * .setPath(path)
65 * .setBufferSize(4096)
66 * .get();}
67 * </pre>
68 *
69 * @see #get()
70 * @since 2.12.0
71 */
72 // @formatter:on
73 public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
74
75 /**
76 * Builds a new {@link BufferedFileChannelInputStream}.
77 * <p>
78 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
79 * </p>
80 * <p>
81 * This builder use the following aspects:
82 * </p>
83 * <ul>
84 * <li>{@link #getInputStream()}</li>
85 * <li>{@link #getBufferSize()}</li>
86 * </ul>
87 *
88 * @return a new instance.
89 * @throws IllegalStateException if the {@code origin} is {@code null}.
90 * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
91 * @throws IOException If an I/O error occurs
92 * @see #getPath()
93 * @see #getBufferSize()
94 */
95 @Override
96 public BufferedFileChannelInputStream get() throws IOException {
97 return new BufferedFileChannelInputStream(getPath(), getBufferSize());
98 }
99
100 }
101
102 /**
103 * Constructs a new {@link Builder}.
104 *
105 * @return a new {@link Builder}.
106 * @since 2.12.0
107 */
108 public static Builder builder() {
109 return new Builder();
110 }
111
112 private final ByteBuffer byteBuffer;
113
114 private final FileChannel fileChannel;
115
116 /**
117 * Constructs a new instance for the given File.
118 *
119 * @param file The file to stream.
120 * @throws IOException If an I/O error occurs
121 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
122 */
123 @Deprecated
124 public BufferedFileChannelInputStream(final File file) throws IOException {
125 this(file, IOUtils.DEFAULT_BUFFER_SIZE);
126 }
127
128 /**
129 * Constructs a new instance for the given File and buffer size.
130 *
131 * @param file The file to stream.
132 * @param bufferSize buffer size.
133 * @throws IOException If an I/O error occurs
134 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
135 */
136 @Deprecated
137 public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
138 this(file.toPath(), bufferSize);
139 }
140
141 /**
142 * Constructs a new instance for the given Path.
143 *
144 * @param path The path to stream.
145 * @throws IOException If an I/O error occurs
146 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
147 */
148 @Deprecated
149 public BufferedFileChannelInputStream(final Path path) throws IOException {
150 this(path, IOUtils.DEFAULT_BUFFER_SIZE);
151 }
152
153 /**
154 * Constructs a new instance for the given Path and buffer size.
155 *
156 * @param path The path to stream.
157 * @param bufferSize buffer size.
158 * @throws IOException If an I/O error occurs
159 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
160 */
161 @Deprecated
162 public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
163 Objects.requireNonNull(path, "path");
164 fileChannel = FileChannel.open(path, StandardOpenOption.READ);
165 byteBuffer = ByteBuffer.allocateDirect(bufferSize);
166 byteBuffer.flip();
167 }
168
169 @Override
170 public synchronized int available() throws IOException {
171 return byteBuffer.remaining();
172 }
173
174 /**
175 * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
176 * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
177 * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
178 * API to manually dispose of these kinds of buffers.
179 *
180 * @param buffer the buffer to clean.
181 */
182 private void clean(final ByteBuffer buffer) {
183 if (buffer.isDirect()) {
184 cleanDirectBuffer(buffer);
185 }
186 }
187
188 /**
189 * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
190 * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
191 * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
192 * reflection.
193 *
194 * @param buffer the buffer to clean. must be a DirectBuffer.
195 */
196 private void cleanDirectBuffer(final ByteBuffer buffer) {
197 if (ByteBufferCleaner.isSupported()) {
198 ByteBufferCleaner.clean(buffer);
199 }
200 }
201
202 @Override
203 public synchronized void close() throws IOException {
204 try {
205 fileChannel.close();
206 } finally {
207 clean(byteBuffer);
208 }
209 }
210
211 @Override
212 public synchronized int read() throws IOException {
213 if (!refill()) {
214 return EOF;
215 }
216 return byteBuffer.get() & 0xFF;
217 }
218
219 @Override
220 public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
221 if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
222 throw new IndexOutOfBoundsException();
223 }
224 if (!refill()) {
225 return EOF;
226 }
227 len = Math.min(len, byteBuffer.remaining());
228 byteBuffer.get(b, offset, len);
229 return len;
230 }
231
232 /**
233 * Checks whether data is left to be read from the input stream.
234 *
235 * @return true if data is left, false otherwise
236 * @throws IOException if an I/O error occurs.
237 */
238 private boolean refill() throws IOException {
239 if (!byteBuffer.hasRemaining()) {
240 byteBuffer.clear();
241 int nRead = 0;
242 while (nRead == 0) {
243 nRead = fileChannel.read(byteBuffer);
244 }
245 byteBuffer.flip();
246 return nRead >= 0;
247 }
248 return true;
249 }
250
251 @Override
252 public synchronized long skip(final long n) throws IOException {
253 if (n <= 0L) {
254 return 0L;
255 }
256 if (byteBuffer.remaining() >= n) {
257 // The buffered content is enough to skip
258 byteBuffer.position(byteBuffer.position() + (int) n);
259 return n;
260 }
261 final long skippedFromBuffer = byteBuffer.remaining();
262 final long toSkipFromFileChannel = n - skippedFromBuffer;
263 // Discard everything we have read in the buffer.
264 byteBuffer.position(0);
265 byteBuffer.flip();
266 return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
267 }
268
269 private long skipFromFileChannel(final long n) throws IOException {
270 final long currentFilePosition = fileChannel.position();
271 final long size = fileChannel.size();
272 if (n > size - currentFilePosition) {
273 fileChannel.position(size);
274 return size - currentFilePosition;
275 }
276 fileChannel.position(currentFilePosition + n);
277 return n;
278 }
279
280 }