1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertNotNull;
21
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.nio.charset.StandardCharsets;
25 import java.util.HashMap;
26 import java.util.Random;
27
28 import org.apache.commons.io.input.CharSequenceInputStream;
29 import org.apache.commons.io.input.DemuxInputStream;
30 import org.apache.commons.io.output.ByteArrayOutputStream;
31 import org.apache.commons.io.output.DemuxOutputStream;
32 import org.apache.commons.io.test.TestUtils;
33 import org.junit.jupiter.api.Test;
34
35
36
37
38 public class DemuxInputStreamTest {
39
40 private static final class ReaderThread extends Thread {
41 private final DemuxInputStream demuxInputStream;
42 private final InputStream inputStream;
43 private final StringBuffer stringBuffer = new StringBuffer();
44
45 ReaderThread(final String name, final InputStream input, final DemuxInputStream demux) {
46 super(name);
47 inputStream = input;
48 demuxInputStream = demux;
49 }
50
51 public String getData() {
52 return stringBuffer.toString();
53 }
54
55 @Override
56 public void run() {
57 demuxInputStream.bindStream(inputStream);
58
59 try {
60 int ch = demuxInputStream.read();
61 while (-1 != ch) {
62
63 stringBuffer.append((char) ch);
64
65 final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
66 TestUtils.sleep(sleepMillis);
67 ch = demuxInputStream.read();
68 }
69 } catch (final Exception e) {
70 e.printStackTrace();
71 }
72 }
73 }
74
75 private static final class WriterThread extends Thread {
76 private final byte[] byteArray;
77 private final DemuxOutputStream demuxOutputStream;
78 private final OutputStream outputStream;
79
80 WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux) {
81 super(name);
82 outputStream = output;
83 demuxOutputStream = demux;
84 byteArray = data.getBytes();
85 }
86
87 @Override
88 public void run() {
89 demuxOutputStream.bindStream(outputStream);
90 for (final byte element : byteArray) {
91 try {
92
93 demuxOutputStream.write(element);
94 final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
95 TestUtils.sleep(sleepMillis);
96 } catch (final Exception e) {
97 e.printStackTrace();
98 }
99 }
100 }
101 }
102
103 private static final Random RANDOM = new Random();
104 private static final String DATA1 = "Data for thread1";
105
106 private static final String DATA2 = "Data for thread2";
107 private static final String DATA3 = "Data for thread3";
108 private static final String DATA4 = "Data for thread4";
109 private static final String T1 = "Thread1";
110
111 private static final String T2 = "Thread2";
112 private static final String T3 = "Thread3";
113 private static final String T4 = "Thread4";
114
115 private final HashMap<String, ByteArrayOutputStream> outputMap = new HashMap<>();
116
117 private final HashMap<String, Thread> threadMap = new HashMap<>();
118
119 private void doJoin() throws InterruptedException {
120 for (final String name : threadMap.keySet()) {
121 final Thread thread = threadMap.get(name);
122 thread.join();
123 }
124 }
125
126 private void doStart() {
127 threadMap.keySet().forEach(name -> threadMap.get(name).start());
128 }
129
130 private String getInput(final String threadName) {
131 final ReaderThread thread = (ReaderThread) threadMap.get(threadName);
132 assertNotNull(thread, "getInput()");
133 return thread.getData();
134 }
135
136 private String getOutput(final String threadName) {
137 final ByteArrayOutputStream output = outputMap.get(threadName);
138 assertNotNull(output, "getOutput()");
139 return output.toString(StandardCharsets.UTF_8);
140 }
141
142 private void startReader(final String name, final String data, final DemuxInputStream demux) {
143 final InputStream input = CharSequenceInputStream.builder().setCharSequence(data).get();
144 final ReaderThread thread = new ReaderThread(name, input, demux);
145 threadMap.put(name, thread);
146 }
147
148 private void startWriter(final String name, final String data, final DemuxOutputStream demux) {
149 final ByteArrayOutputStream output = new ByteArrayOutputStream();
150 outputMap.put(name, output);
151 final WriterThread thread = new WriterThread(name, data, output, demux);
152 threadMap.put(name, thread);
153 }
154
155 @Test
156 public void testInputStream() throws Exception {
157 try (final DemuxInputStream input = new DemuxInputStream()) {
158 startReader(T1, DATA1, input);
159 startReader(T2, DATA2, input);
160 startReader(T3, DATA3, input);
161 startReader(T4, DATA4, input);
162
163 doStart();
164 doJoin();
165
166 assertEquals(DATA1, getInput(T1), "Data1");
167 assertEquals(DATA2, getInput(T2), "Data2");
168 assertEquals(DATA3, getInput(T3), "Data3");
169 assertEquals(DATA4, getInput(T4), "Data4");
170 }
171 }
172
173 @Test
174 public void testOutputStream() throws Exception {
175 try (final DemuxOutputStream output = new DemuxOutputStream()) {
176 startWriter(T1, DATA1, output);
177 startWriter(T2, DATA2, output);
178 startWriter(T3, DATA3, output);
179 startWriter(T4, DATA4, output);
180
181 doStart();
182 doJoin();
183
184 assertEquals(DATA1, getOutput(T1), "Data1");
185 assertEquals(DATA2, getOutput(T2), "Data2");
186 assertEquals(DATA3, getOutput(T3), "Data3");
187 assertEquals(DATA4, getOutput(T4), "Data4");
188 }
189 }
190
191 @Test
192 public void testReadEOF() throws Exception {
193 try (final DemuxInputStream input = new DemuxInputStream()) {
194 assertEquals(IOUtils.EOF, input.read());
195 }
196 }
197 }