1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.apache.commons.io.input;
15
16 import static org.apache.commons.io.IOUtils.EOF;
17
18
19 import java.io.EOFException;
20 import java.io.FilterInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.Objects;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.ReentrantLock;
32
33 import org.apache.commons.io.build.AbstractStreamBuilder;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 public class ReadAheadInputStream extends FilterInputStream {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> {
71
72 private ExecutorService executorService;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 @SuppressWarnings("resource")
96 @Override
97 public ReadAheadInputStream get() throws IOException {
98 return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(),
99 executorService == null);
100 }
101
102
103
104
105
106
107
108 public Builder setExecutorService(final ExecutorService executorService) {
109 this.executorService = executorService;
110 return this;
111 }
112
113 }
114
115 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]);
116
117
118
119
120
121
122
123 public static Builder builder() {
124 return new Builder();
125 }
126
127
128
129
130
131
132
133 private static Thread newDaemonThread(final Runnable r) {
134 final Thread thread = new Thread(r, "commons-io-read-ahead");
135 thread.setDaemon(true);
136 return thread;
137 }
138
139
140
141
142
143
144 private static ExecutorService newExecutorService() {
145 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread);
146 }
147
148 private final ReentrantLock stateChangeLock = new ReentrantLock();
149
150
151 private ByteBuffer activeBuffer;
152
153
154 private ByteBuffer readAheadBuffer;
155
156
157 private boolean endOfStream;
158
159
160
161 private boolean readInProgress;
162
163
164
165 private boolean readAborted;
166
167
168 private Throwable readException;
169
170
171
172 private boolean isClosed;
173
174
175
176
177 private boolean isUnderlyingInputStreamBeingClosed;
178
179
180
181 private boolean isReading;
182
183
184 private final AtomicBoolean isWaiting = new AtomicBoolean();
185
186 private final ExecutorService executorService;
187
188 private final boolean shutdownExecutorService;
189
190 private final Condition asyncReadComplete = stateChangeLock.newCondition();
191
192
193
194
195
196
197
198
199 @Deprecated
200 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) {
201 this(inputStream, bufferSizeInBytes, newExecutorService(), true);
202 }
203
204
205
206
207
208
209
210
211
212 @Deprecated
213 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) {
214 this(inputStream, bufferSizeInBytes, executorService, false);
215 }
216
217
218
219
220
221
222
223
224
225 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService,
226 final boolean shutdownExecutorService) {
227 super(Objects.requireNonNull(inputStream, "inputStream"));
228 if (bufferSizeInBytes <= 0) {
229 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
230 }
231 this.executorService = Objects.requireNonNull(executorService, "executorService");
232 this.shutdownExecutorService = shutdownExecutorService;
233 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
234 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
235 this.activeBuffer.flip();
236 this.readAheadBuffer.flip();
237 }
238
239 @Override
240 public int available() throws IOException {
241 stateChangeLock.lock();
242
243 try {
244 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining());
245 } finally {
246 stateChangeLock.unlock();
247 }
248 }
249
250 private void checkReadException() throws IOException {
251 if (readAborted) {
252 if (readException instanceof IOException) {
253 throw (IOException) readException;
254 }
255 throw new IOException(readException);
256 }
257 }
258
259 @Override
260 public void close() throws IOException {
261 boolean isSafeToCloseUnderlyingInputStream = false;
262 stateChangeLock.lock();
263 try {
264 if (isClosed) {
265 return;
266 }
267 isClosed = true;
268 if (!isReading) {
269
270 isSafeToCloseUnderlyingInputStream = true;
271
272 isUnderlyingInputStreamBeingClosed = true;
273 }
274 } finally {
275 stateChangeLock.unlock();
276 }
277
278 if (shutdownExecutorService) {
279 try {
280 executorService.shutdownNow();
281 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
282 } catch (final InterruptedException e) {
283 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
284 iio.initCause(e);
285 throw iio;
286 } finally {
287 if (isSafeToCloseUnderlyingInputStream) {
288 super.close();
289 }
290 }
291 }
292 }
293
294 private void closeUnderlyingInputStreamIfNecessary() {
295 boolean needToCloseUnderlyingInputStream = false;
296 stateChangeLock.lock();
297 try {
298 isReading = false;
299 if (isClosed && !isUnderlyingInputStreamBeingClosed) {
300
301 needToCloseUnderlyingInputStream = true;
302 }
303 } finally {
304 stateChangeLock.unlock();
305 }
306 if (needToCloseUnderlyingInputStream) {
307 try {
308 super.close();
309 } catch (final IOException ignored) {
310
311 }
312 }
313 }
314
315 private boolean isEndOfStream() {
316 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream;
317 }
318
319 @Override
320 public int read() throws IOException {
321 if (activeBuffer.hasRemaining()) {
322
323 return activeBuffer.get() & 0xFF;
324 }
325 final byte[] oneByteArray = BYTE_ARRAY_1.get();
326 oneByteArray[0] = 0;
327 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF;
328 }
329
330 @Override
331 public int read(final byte[] b, final int offset, int len) throws IOException {
332 if (offset < 0 || len < 0 || len > b.length - offset) {
333 throw new IndexOutOfBoundsException();
334 }
335 if (len == 0) {
336 return 0;
337 }
338
339 if (!activeBuffer.hasRemaining()) {
340
341 stateChangeLock.lock();
342 try {
343 waitForAsyncReadComplete();
344 if (!readAheadBuffer.hasRemaining()) {
345
346 readAsync();
347 waitForAsyncReadComplete();
348 if (isEndOfStream()) {
349 return EOF;
350 }
351 }
352
353 swapBuffers();
354
355 readAsync();
356 } finally {
357 stateChangeLock.unlock();
358 }
359 }
360 len = Math.min(len, activeBuffer.remaining());
361 activeBuffer.get(b, offset, len);
362
363 return len;
364 }
365
366
367
368
369
370
371 private void readAsync() throws IOException {
372 stateChangeLock.lock();
373 final byte[] arr;
374 try {
375 arr = readAheadBuffer.array();
376 if (endOfStream || readInProgress) {
377 return;
378 }
379 checkReadException();
380 readAheadBuffer.position(0);
381 readAheadBuffer.flip();
382 readInProgress = true;
383 } finally {
384 stateChangeLock.unlock();
385 }
386 executorService.execute(() -> {
387 stateChangeLock.lock();
388 try {
389 if (isClosed) {
390 readInProgress = false;
391 return;
392 }
393
394
395 isReading = true;
396 } finally {
397 stateChangeLock.unlock();
398 }
399
400
401
402
403
404
405
406
407
408
409 int read = 0;
410 int off = 0, len = arr.length;
411 Throwable exception = null;
412 try {
413
414
415 do {
416 read = in.read(arr, off, len);
417 if (read <= 0) {
418 break;
419 }
420 off += read;
421 len -= read;
422 } while (len > 0 && !isWaiting.get());
423 } catch (final Throwable ex) {
424 exception = ex;
425 if (ex instanceof Error) {
426
427
428 throw (Error) ex;
429 }
430 } finally {
431 stateChangeLock.lock();
432 try {
433 readAheadBuffer.limit(off);
434 if (read < 0 || exception instanceof EOFException) {
435 endOfStream = true;
436 } else if (exception != null) {
437 readAborted = true;
438 readException = exception;
439 }
440 readInProgress = false;
441 signalAsyncReadComplete();
442 } finally {
443 stateChangeLock.unlock();
444 }
445 closeUnderlyingInputStreamIfNecessary();
446 }
447 });
448 }
449
450 private void signalAsyncReadComplete() {
451 stateChangeLock.lock();
452 try {
453 asyncReadComplete.signalAll();
454 } finally {
455 stateChangeLock.unlock();
456 }
457 }
458
459 @Override
460 public long skip(final long n) throws IOException {
461 if (n <= 0L) {
462 return 0L;
463 }
464 if (n <= activeBuffer.remaining()) {
465
466 activeBuffer.position((int) n + activeBuffer.position());
467 return n;
468 }
469 stateChangeLock.lock();
470 final long skipped;
471 try {
472 skipped = skipInternal(n);
473 } finally {
474 stateChangeLock.unlock();
475 }
476 return skipped;
477 }
478
479
480
481
482
483
484
485
486
487 private long skipInternal(final long n) throws IOException {
488 assert stateChangeLock.isLocked();
489 waitForAsyncReadComplete();
490 if (isEndOfStream()) {
491 return 0;
492 }
493 if (available() >= n) {
494
495 int toSkip = (int) n;
496
497 toSkip -= activeBuffer.remaining();
498 assert toSkip > 0;
499 activeBuffer.position(0);
500 activeBuffer.flip();
501 readAheadBuffer.position(toSkip + readAheadBuffer.position());
502 swapBuffers();
503
504 readAsync();
505 return n;
506 }
507 final int skippedBytes = available();
508 final long toSkip = n - skippedBytes;
509 activeBuffer.position(0);
510 activeBuffer.flip();
511 readAheadBuffer.position(0);
512 readAheadBuffer.flip();
513 final long skippedFromInputStream = in.skip(toSkip);
514 readAsync();
515 return skippedBytes + skippedFromInputStream;
516 }
517
518
519
520
521 private void swapBuffers() {
522 final ByteBuffer temp = activeBuffer;
523 activeBuffer = readAheadBuffer;
524 readAheadBuffer = temp;
525 }
526
527 private void waitForAsyncReadComplete() throws IOException {
528 stateChangeLock.lock();
529 try {
530 isWaiting.set(true);
531
532
533 while (readInProgress) {
534 asyncReadComplete.await();
535 }
536 } catch (final InterruptedException e) {
537 final InterruptedIOException iio = new InterruptedIOException(e.getMessage());
538 iio.initCause(e);
539 throw iio;
540 } finally {
541 try {
542 isWaiting.set(false);
543 } finally {
544 stateChangeLock.unlock();
545 }
546 }
547 checkReadException();
548 }
549 }