View Javadoc
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    *     http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package org.apache.commons.io.input;
15  
16  import static org.apache.commons.io.IOUtils.EOF;
17  
18  import java.io.File;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.FileChannel;
23  import java.nio.file.Path;
24  import java.nio.file.StandardOpenOption;
25  import java.util.Objects;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.io.build.AbstractStreamBuilder;
29  
30  /**
31   * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
32   * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
33   * reading a file using NIO, but does not support buffering.
34   * <p>
35   * To build an instance, use {@link Builder}.
36   * </p>
37   * <p>
38   * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
39   * </p>
40   *
41   * @see Builder
42   * @since 2.9.0
43   */
44  public final class BufferedFileChannelInputStream extends InputStream {
45  
46      // @formatter:off
47      /**
48       * Builds a new {@link BufferedFileChannelInputStream}.
49       *
50       * <p>
51       * Using File IO:
52       * </p>
53       * <pre>{@code
54       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
55       *   .setFile(file)
56       *   .setBufferSize(4096)
57       *   .get();}
58       * </pre>
59       * <p>
60       * Using NIO Path:
61       * </p>
62       * <pre>{@code
63       * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
64       *   .setPath(path)
65       *   .setBufferSize(4096)
66       *   .get();}
67       * </pre>
68       *
69       * @see #get()
70       * @since 2.12.0
71       */
72      // @formatter:on
73      public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
74  
75          /**
76           * Builds a new {@link BufferedFileChannelInputStream}.
77           * <p>
78           * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
79           * </p>
80           * <p>
81           * This builder use the following aspects:
82           * </p>
83           * <ul>
84           * <li>{@link #getInputStream()}</li>
85           * <li>{@link #getBufferSize()}</li>
86           * </ul>
87           *
88           * @return a new instance.
89           * @throws IllegalStateException         if the {@code origin} is {@code null}.
90           * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}.
91           * @throws IOException If an I/O error occurs
92           * @see #getPath()
93           * @see #getBufferSize()
94           */
95          @Override
96          public BufferedFileChannelInputStream get() throws IOException {
97              return new BufferedFileChannelInputStream(getPath(), getBufferSize());
98          }
99  
100     }
101 
102     /**
103      * Constructs a new {@link Builder}.
104      *
105      * @return a new {@link Builder}.
106      * @since 2.12.0
107      */
108     public static Builder builder() {
109         return new Builder();
110     }
111 
112     private final ByteBuffer byteBuffer;
113 
114     private final FileChannel fileChannel;
115 
116     /**
117      * Constructs a new instance for the given File.
118      *
119      * @param file The file to stream.
120      * @throws IOException If an I/O error occurs
121      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
122      */
123     @Deprecated
124     public BufferedFileChannelInputStream(final File file) throws IOException {
125         this(file, IOUtils.DEFAULT_BUFFER_SIZE);
126     }
127 
128     /**
129      * Constructs a new instance for the given File and buffer size.
130      *
131      * @param file       The file to stream.
132      * @param bufferSize buffer size.
133      * @throws IOException If an I/O error occurs
134      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
135      */
136     @Deprecated
137     public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
138         this(file.toPath(), bufferSize);
139     }
140 
141     /**
142      * Constructs a new instance for the given Path.
143      *
144      * @param path The path to stream.
145      * @throws IOException If an I/O error occurs
146      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
147      */
148     @Deprecated
149     public BufferedFileChannelInputStream(final Path path) throws IOException {
150         this(path, IOUtils.DEFAULT_BUFFER_SIZE);
151     }
152 
153     /**
154      * Constructs a new instance for the given Path and buffer size.
155      *
156      * @param path       The path to stream.
157      * @param bufferSize buffer size.
158      * @throws IOException If an I/O error occurs
159      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
160      */
161     @Deprecated
162     public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
163         Objects.requireNonNull(path, "path");
164         fileChannel = FileChannel.open(path, StandardOpenOption.READ);
165         byteBuffer = ByteBuffer.allocateDirect(bufferSize);
166         byteBuffer.flip();
167     }
168 
169     @Override
170     public synchronized int available() throws IOException {
171         return byteBuffer.remaining();
172     }
173 
174     /**
175      * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
176      * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
177      * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
178      * API to manually dispose of these kinds of buffers.
179      *
180      * @param buffer the buffer to clean.
181      */
182     private void clean(final ByteBuffer buffer) {
183         if (buffer.isDirect()) {
184             cleanDirectBuffer(buffer);
185         }
186     }
187 
188     /**
189      * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
190      * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
191      * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
192      * reflection.
193      *
194      * @param buffer the buffer to clean. must be a DirectBuffer.
195      */
196     private void cleanDirectBuffer(final ByteBuffer buffer) {
197         if (ByteBufferCleaner.isSupported()) {
198             ByteBufferCleaner.clean(buffer);
199         }
200     }
201 
202     @Override
203     public synchronized void close() throws IOException {
204         try {
205             fileChannel.close();
206         } finally {
207             clean(byteBuffer);
208         }
209     }
210 
211     @Override
212     public synchronized int read() throws IOException {
213         if (!refill()) {
214             return EOF;
215         }
216         return byteBuffer.get() & 0xFF;
217     }
218 
219     @Override
220     public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
221         if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
222             throw new IndexOutOfBoundsException();
223         }
224         if (!refill()) {
225             return EOF;
226         }
227         len = Math.min(len, byteBuffer.remaining());
228         byteBuffer.get(b, offset, len);
229         return len;
230     }
231 
232     /**
233      * Checks whether data is left to be read from the input stream.
234      *
235      * @return true if data is left, false otherwise
236      * @throws IOException if an I/O error occurs.
237      */
238     private boolean refill() throws IOException {
239         if (!byteBuffer.hasRemaining()) {
240             byteBuffer.clear();
241             int nRead = 0;
242             while (nRead == 0) {
243                 nRead = fileChannel.read(byteBuffer);
244             }
245             byteBuffer.flip();
246             return nRead >= 0;
247         }
248         return true;
249     }
250 
251     @Override
252     public synchronized long skip(final long n) throws IOException {
253         if (n <= 0L) {
254             return 0L;
255         }
256         if (byteBuffer.remaining() >= n) {
257             // The buffered content is enough to skip
258             byteBuffer.position(byteBuffer.position() + (int) n);
259             return n;
260         }
261         final long skippedFromBuffer = byteBuffer.remaining();
262         final long toSkipFromFileChannel = n - skippedFromBuffer;
263         // Discard everything we have read in the buffer.
264         byteBuffer.position(0);
265         byteBuffer.flip();
266         return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
267     }
268 
269     private long skipFromFileChannel(final long n) throws IOException {
270         final long currentFilePosition = fileChannel.position();
271         final long size = fileChannel.size();
272         if (n > size - currentFilePosition) {
273             fileChannel.position(size);
274             return size - currentFilePosition;
275         }
276         fileChannel.position(currentFilePosition + n);
277         return n;
278     }
279 
280 }