View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.input;
18  
19  import static org.apache.commons.io.IOUtils.EOF;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PipedInputStream;
24  import java.io.PipedOutputStream;
25  import java.time.Duration;
26  import java.util.Objects;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.io.build.AbstractStreamBuilder;
32  import org.apache.commons.io.output.QueueOutputStream;
33  
34  /**
35   * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
36   * <p>
37   * To build an instance, use {@link Builder}.
38   * </p>
39   * <p>
40   * Example usage:
41   * </p>
42   * <pre>
43   * QueueInputStream inputStream = new QueueInputStream();
44   * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
45   *
46   * outputStream.write("hello world".getBytes(UTF_8));
47   * inputStream.read();
48   * </pre>
49   * <p>
50   * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
51   * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
52   * </p>
53   * <p>
54   * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
55   * {@link IOException}.
56   * </p>
57   *
58   * @see Builder
59   * @see QueueOutputStream
60   * @since 2.9.0
61   */
62  public class QueueInputStream extends InputStream {
63  
64      // @formatter:off
65      /**
66       * Builds a new {@link QueueInputStream}.
67       *
68       * <p>
69       * For example:
70       * </p>
71       * <pre>{@code
72       * QueueInputStream s = QueueInputStream.builder()
73       *   .setBlockingQueue(new LinkedBlockingQueue<>())
74       *   .setTimeout(Duration.ZERO)
75       *   .get();}
76       * </pre>
77       *
78       * @see #get()
79       * @since 2.12.0
80       */
81      // @formatter:on
82      public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
83  
84          private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
85          private Duration timeout = Duration.ZERO;
86  
87          /**
88           * Builds a new {@link QueueInputStream}.
89           * <p>
90           * This builder use the following aspects:
91           * </p>
92           * <ul>
93           * <li>{@link #setBlockingQueue(BlockingQueue)}</li>
94           * <li>timeout</li>
95           * </ul>
96           *
97           * @return a new instance.
98           */
99          @Override
100         public QueueInputStream get() {
101             return new QueueInputStream(blockingQueue, timeout);
102         }
103 
104         /**
105          * Sets backing queue for the stream.
106          *
107          * @param blockingQueue backing queue for the stream.
108          * @return this
109          */
110         public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
111             this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
112             return this;
113         }
114 
115         /**
116          * Sets the polling timeout.
117          *
118          * @param timeout the polling timeout.
119          * @return this.
120          */
121         public Builder setTimeout(final Duration timeout) {
122             if (timeout != null && timeout.toNanos() < 0) {
123                 throw new IllegalArgumentException("timeout must not be negative");
124             }
125             this.timeout = timeout != null ? timeout : Duration.ZERO;
126             return this;
127         }
128 
129     }
130 
131     /**
132      * Constructs a new {@link Builder}.
133      *
134      * @return a new {@link Builder}.
135      * @since 2.12.0
136      */
137     public static Builder builder() {
138         return new Builder();
139     }
140 
141     private final BlockingQueue<Integer> blockingQueue;
142 
143     private final long timeoutNanos;
144 
145     /**
146      * Constructs a new instance with no limit to its internal queue size and zero timeout.
147      */
148     public QueueInputStream() {
149         this(new LinkedBlockingQueue<>());
150     }
151 
152     /**
153      * Constructs a new instance with given queue and zero timeout.
154      *
155      * @param blockingQueue backing queue for the stream.
156      * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
157      */
158     @Deprecated
159     public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
160         this(blockingQueue, Duration.ZERO);
161     }
162 
163     /**
164      * Constructs a new instance with given queue and timeout.
165      *
166      * @param blockingQueue backing queue for the stream.
167      * @param timeout       how long to wait before giving up when polling the queue.
168      */
169     private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) {
170         this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
171         this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos();
172     }
173 
174     /**
175      * Gets the blocking queue.
176      *
177      * @return the blocking queue.
178      */
179     BlockingQueue<Integer> getBlockingQueue() {
180         return blockingQueue;
181     }
182 
183     /**
184      * Gets the timeout duration.
185      *
186      * @return the timeout duration.
187      */
188     Duration getTimeout() {
189         return Duration.ofNanos(timeoutNanos);
190     }
191 
192     /**
193      * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
194      *
195      * @return QueueOutputStream connected to this stream.
196      */
197     public QueueOutputStream newQueueOutputStream() {
198         return new QueueOutputStream(blockingQueue);
199     }
200 
201     /**
202      * Reads and returns a single byte.
203      *
204      * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
205      * @throws IllegalStateException if thread is interrupted while waiting.
206      */
207     @Override
208     public int read() {
209         try {
210             final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
211             return value == null ? EOF : 0xFF & value;
212         } catch (final InterruptedException e) {
213             Thread.currentThread().interrupt();
214             // throw runtime unchecked exception to maintain signature backward-compatibility of
215             // this read method, which does not declare IOException
216             throw new IllegalStateException(e);
217         }
218     }
219 
220 }