1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.io.output;
18
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.io.OutputStream;
22 import java.io.PipedInputStream;
23 import java.io.PipedOutputStream;
24 import java.util.Objects;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27
28 import org.apache.commons.io.input.QueueInputStream;
29
30 /**
31 * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's written in queue
32 * output stream.
33 * <p>
34 * Example usage:
35 * </p>
36 *
37 * <pre>
38 * QueueOutputStream outputStream = new QueueOutputStream();
39 * QueueInputStream inputStream = outputStream.newPipeInputStream();
40 *
41 * outputStream.write("hello world".getBytes(UTF_8));
42 * inputStream.read();
43 * </pre>
44 *
45 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a
46 * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current
47 * thread. Instances can be used longer after initial threads exited.
48 * <p>
49 * Closing a {@link QueueOutputStream} has no effect. The methods in this class can be called after the stream has been
50 * closed without generating an {@link IOException}.
51 * </p>
52 *
53 * @see QueueInputStream
54 * @since 2.9.0
55 */
56 public class QueueOutputStream extends OutputStream {
57
58 private final BlockingQueue<Integer> blockingQueue;
59
60 /**
61 * Constructs a new instance with no limit to internal buffer size.
62 */
63 public QueueOutputStream() {
64 this(new LinkedBlockingQueue<>());
65 }
66
67 /**
68 * Constructs a new instance with given buffer.
69 *
70 * @param blockingQueue backing queue for the stream
71 */
72 public QueueOutputStream(final BlockingQueue<Integer> blockingQueue) {
73 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
74 }
75
76 /**
77 * Constructs a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the
78 * input stream.
79 *
80 * @return QueueInputStream connected to this stream
81 */
82 public QueueInputStream newQueueInputStream() {
83 return QueueInputStream.builder().setBlockingQueue(blockingQueue).get();
84 }
85
86 /**
87 * Writes a single byte.
88 *
89 * @throws InterruptedIOException if the thread is interrupted while writing to the queue.
90 */
91 @Override
92 public void write(final int b) throws InterruptedIOException {
93 try {
94 blockingQueue.put(0xFF & b);
95 } catch (final InterruptedException e) {
96 Thread.currentThread().interrupt();
97 final InterruptedIOException interruptedIoException = new InterruptedIOException();
98 interruptedIoException.initCause(e);
99 throw interruptedIoException;
100 }
101 }
102 }