ReadAheadInputStream.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.commons.io.input;

import static org.apache.commons.io.IOUtils.EOF;

// import javax.annotation.concurrent.GuardedBy;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.io.build.AbstractStreamBuilder;

/**
 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current
 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a
 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we
 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.
 * <p>
 * To build an instance, use {@link Builder}.
 * </p>
 * <p>
 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.
 * </p>
 *
 * @see Builder
 * @since 2.9.0
 */
public class ReadAheadInputStream extends FilterInputStream {

    // @formatter:off
    /**
     * Builds a new {@link ReadAheadInputStream}.
     *
     * <p>
     * For example:
     * </p>
     * <pre>{@code
     * ReadAheadInputStream s = ReadAheadInputStream.builder()
     *   .setPath(path)
     *   .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread))
     *   .get();}
     * </pre>
     *
     * @see #get()
     * @since 2.12.0
     */
    // @formatter:on
    public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {

        private ExecutorService executorService;

        /**
         * Builds a new {@link ReadAheadInputStream}.
         * <p>
         * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception.
         * </p>
         * <p>
         * This builder use the following aspects:
         * </p>
         * <ul>
         * <li>{@link #getInputStream()}</li>
         * <li>{@link #getBufferSize()}</li>
         * <li>{@link ExecutorService}</li>
         * </ul>
         *
         * @return a new instance.
         * @throws IllegalStateException         if the {@code origin} is {@code null}.
         * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
         * @throws IOException                   if an I/O error occurs.
         * @see #getInputStream()
         * @see #getBufferSize()
         */
        @SuppressWarnings("resource")
        @Override
        public ReadAheadInputStream get() throws IOException {
            return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
                    executorService == null);
        }

        /**
         * Sets the executor service for the read-ahead thread.
         *
         * @param executorService the executor service for the read-ahead thread.
         * @return this
         */
        public Builder setExecutorService(final ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

    }

    private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);

    /**
     * Constructs a new {@link Builder}.
     *
     * @return a new {@link Builder}.
     * @since 2.12.0
     */
    public static Builder builder() {
        return new Builder();
    }

    /**
     * Constructs a new daemon thread.
     *
     * @param r the thread's runnable.
     * @return a new daemon thread.
     */
    private static Thread newDaemonThread(final Runnable r) {
        final Thread thread = new Thread(r, "commons-io-read-ahead");
        thread.setDaemon(true);
        return thread;
    }

    /**
     * Constructs a new daemon executor service.
     *
     * @return a new daemon executor service.
     */
    private static ExecutorService newExecutorService() {
        return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
    }

    private final ReentrantLock stateChangeLock = new ReentrantLock();

    // @GuardedBy("stateChangeLock")
    private ByteBuffer activeBuffer;

    // @GuardedBy("stateChangeLock")
    private ByteBuffer readAheadBuffer;

    // @GuardedBy("stateChangeLock")
    private boolean endOfStream;

    // @GuardedBy("stateChangeLock")
    // true if async read is in progress
    private boolean readInProgress;

    // @GuardedBy("stateChangeLock")
    // true if read is aborted due to an exception in reading from underlying input stream.
    private boolean readAborted;

    // @GuardedBy("stateChangeLock")
    private Throwable readException;

    // @GuardedBy("stateChangeLock")
    // whether the close method is called.
    private boolean isClosed;

    // @GuardedBy("stateChangeLock")
    // true when the close method will close the underlying input stream. This is valid only if
    // `isClosed` is true.
    private boolean isUnderlyingInputStreamBeingClosed;

    // @GuardedBy("stateChangeLock")
    // whether there is a read ahead task running,
    private boolean isReading;

    // Whether there is a reader waiting for data.
    private final AtomicBoolean isWaiting = new AtomicBoolean();

    private final ExecutorService executorService;

    private final boolean shutdownExecutorService;

    private final Condition asyncReadComplete = stateChangeLock.newCondition();

    /**
     * Constructs an instance with the specified buffer size and read-ahead threshold
     *
     * @param inputStream       The underlying input stream.
     * @param bufferSizeInBytes The buffer size.
     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
     */
    @Deprecated
    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
        this(inputStream, bufferSizeInBytes, newExecutorService(), true);
    }

    /**
     * Constructs an instance with the specified buffer size and read-ahead threshold
     *
     * @param inputStream       The underlying input stream.
     * @param bufferSizeInBytes The buffer size.
     * @param executorService   An executor service for the read-ahead thread.
     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
     */
    @Deprecated
    public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
        this(inputStream, bufferSizeInBytes, executorService, false);
    }

    /**
     * Constructs an instance with the specified buffer size and read-ahead threshold
     *
     * @param inputStream             The underlying input stream.
     * @param bufferSizeInBytes       The buffer size.
     * @param executorService         An executor service for the read-ahead thread.
     * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close.
     */
    private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
            final boolean shutdownExecutorService) {
        super(Objects.requireNonNull(inputStream, "inputStream"));
        if (bufferSizeInBytes <= 0) {
            throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
        }
        this.executorService = Objects.requireNonNull(executorService, "executorService");
        this.shutdownExecutorService = shutdownExecutorService;
        this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
        this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
        this.activeBuffer.flip();
        this.readAheadBuffer.flip();
    }

    @Override
    public int available() throws IOException {
        stateChangeLock.lock();
        // Make sure we have no integer overflow.
        try {
            return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
        } finally {
            stateChangeLock.unlock();
        }
    }

    private void checkReadException() throws IOException {
        if (readAborted) {
            if (readException instanceof IOException) {
                throw (IOException) readException;
            }
            throw new IOException(readException);
        }
    }

    @Override
    public void close() throws IOException {
        boolean isSafeToCloseUnderlyingInputStream = false;
        stateChangeLock.lock();
        try {
            if (isClosed) {
                return;
            }
            isClosed = true;
            if (!isReading) {
                // Nobody is reading, so we can close the underlying input stream in this method.
                isSafeToCloseUnderlyingInputStream = true;
                // Flip this to make sure the read ahead task will not close the underlying input stream.
                isUnderlyingInputStreamBeingClosed = true;
            }
        } finally {
            stateChangeLock.unlock();
        }

        if (shutdownExecutorService) {
            try {
                executorService.shutdownNow();
                executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            } catch (final InterruptedException e) {
                final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
                iio.initCause(e);
                throw iio;
            } finally {
                if (isSafeToCloseUnderlyingInputStream) {
                    super.close();
                }
            }
        }
    }

    private void closeUnderlyingInputStreamIfNecessary() {
        boolean needToCloseUnderlyingInputStream = false;
        stateChangeLock.lock();
        try {
            isReading = false;
            if (isClosed && !isUnderlyingInputStreamBeingClosed) {
                // close method cannot close underlyingInputStream because we were reading.
                needToCloseUnderlyingInputStream = true;
            }
        } finally {
            stateChangeLock.unlock();
        }
        if (needToCloseUnderlyingInputStream) {
            try {
                super.close();
            } catch (final IOException ignored) {
                // TODO Rethrow as UncheckedIOException?
            }
        }
    }

    private boolean isEndOfStream() {
        return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
    }

    @Override
    public int read() throws IOException {
        if (activeBuffer.hasRemaining()) {
            // short path - just get one byte.
            return activeBuffer.get() & 0xFF;
        }
        final byte[] oneByteArray = BYTE_ARRAY_1.get();
        oneByteArray[0] = 0;
        return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
    }

    @Override
    public int read(final byte[] b, final int offset, int len) throws IOException {
        if (offset < 0 || len < 0 || len > b.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        if (len == 0) {
            return 0;
        }

        if (!activeBuffer.hasRemaining()) {
            // No remaining in active buffer - lock and switch to write ahead buffer.
            stateChangeLock.lock();
            try {
                waitForAsyncReadComplete();
                if (!readAheadBuffer.hasRemaining()) {
                    // The first read.
                    readAsync();
                    waitForAsyncReadComplete();
                    if (isEndOfStream()) {
                        return EOF;
                    }
                }
                // Swap the newly read ahead buffer in place of empty active buffer.
                swapBuffers();
                // After swapping buffers, trigger another async read for read ahead buffer.
                readAsync();
            } finally {
                stateChangeLock.unlock();
            }
        }
        len = Math.min(len, activeBuffer.remaining());
        activeBuffer.get(b, offset, len);

        return len;
    }

    /**
     * Read data from underlyingInputStream to readAheadBuffer asynchronously.
     *
     * @throws IOException if an I/O error occurs.
     */
    private void readAsync() throws IOException {
        stateChangeLock.lock();
        final byte[] arr;
        try {
            arr = readAheadBuffer.array();
            if (endOfStream || readInProgress) {
                return;
            }
            checkReadException();
            readAheadBuffer.position(0);
            readAheadBuffer.flip();
            readInProgress = true;
        } finally {
            stateChangeLock.unlock();
        }
        executorService.execute(() -> {
            stateChangeLock.lock();
            try {
                if (isClosed) {
                    readInProgress = false;
                    return;
                }
                // Flip this so that the close method will not close the underlying input stream when we
                // are reading.
                isReading = true;
            } finally {
                stateChangeLock.unlock();
            }

            // Please note that it is safe to release the lock and read into the read ahead buffer
            // because either of following two conditions will hold:
            //
            // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
            //
            // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
            // for this async read to complete.
            //
            // So there is no race condition in both the situations.
            int read = 0;
            int off = 0, len = arr.length;
            Throwable exception = null;
            try {
                // try to fill the read ahead buffer.
                // if a reader is waiting, possibly return early.
                do {
                    read = in.read(arr, off, len);
                    if (read <= 0) {
                        break;
                    }
                    off += read;
                    len -= read;
                } while (len > 0 && !isWaiting.get());
            } catch (final Throwable ex) {
                exception = ex;
                if (ex instanceof Error) {
                    // `readException` may not be reported to the user. Rethrow Error to make sure at least
                    // The user can see Error in UncaughtExceptionHandler.
                    throw (Error) ex;
                }
            } finally {
                stateChangeLock.lock();
                try {
                    readAheadBuffer.limit(off);
                    if (read < 0 || exception instanceof EOFException) {
                        endOfStream = true;
                    } else if (exception != null) {
                        readAborted = true;
                        readException = exception;
                    }
                    readInProgress = false;
                    signalAsyncReadComplete();
                } finally {
                    stateChangeLock.unlock();
                }
                closeUnderlyingInputStreamIfNecessary();
            }
        });
    }

    private void signalAsyncReadComplete() {
        stateChangeLock.lock();
        try {
            asyncReadComplete.signalAll();
        } finally {
            stateChangeLock.unlock();
        }
    }

    @Override
    public long skip(final long n) throws IOException {
        if (n <= 0L) {
            return 0L;
        }
        if (n <= activeBuffer.remaining()) {
            // Only skipping from active buffer is sufficient
            activeBuffer.position((int) n + activeBuffer.position());
            return n;
        }
        stateChangeLock.lock();
        final long skipped;
        try {
            skipped = skipInternal(n);
        } finally {
            stateChangeLock.unlock();
        }
        return skipped;
    }

    /**
     * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before
     * calling this function.
     *
     * @param n the number of bytes to be skipped.
     * @return the actual number of bytes skipped.
     * @throws IOException if an I/O error occurs.
     */
    private long skipInternal(final long n) throws IOException {
        assert stateChangeLock.isLocked();
        waitForAsyncReadComplete();
        if (isEndOfStream()) {
            return 0;
        }
        if (available() >= n) {
            // we can skip from the internal buffers
            int toSkip = (int) n;
            // We need to skip from both active buffer and read ahead buffer
            toSkip -= activeBuffer.remaining();
            assert toSkip > 0; // skipping from activeBuffer already handled.
            activeBuffer.position(0);
            activeBuffer.flip();
            readAheadBuffer.position(toSkip + readAheadBuffer.position());
            swapBuffers();
            // Trigger async read to emptied read ahead buffer.
            readAsync();
            return n;
        }
        final int skippedBytes = available();
        final long toSkip = n - skippedBytes;
        activeBuffer.position(0);
        activeBuffer.flip();
        readAheadBuffer.position(0);
        readAheadBuffer.flip();
        final long skippedFromInputStream = in.skip(toSkip);
        readAsync();
        return skippedBytes + skippedFromInputStream;
    }

    /**
     * Flips the active and read ahead buffer
     */
    private void swapBuffers() {
        final ByteBuffer temp = activeBuffer;
        activeBuffer = readAheadBuffer;
        readAheadBuffer = temp;
    }

    private void waitForAsyncReadComplete() throws IOException {
        stateChangeLock.lock();
        try {
            isWaiting.set(true);
            // There is only one reader, and one writer, so the writer should signal only once,
            // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups.
            while (readInProgress) {
                asyncReadComplete.await();
            }
        } catch (final InterruptedException e) {
            final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
            iio.initCause(e);
            throw iio;
        } finally {
            try {
                isWaiting.set(false);
            } finally {
                stateChangeLock.unlock();
            }
        }
        checkReadException();
    }
}