1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support.classic;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.Charset;
34 import java.nio.charset.StandardCharsets;
35 import java.util.List;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41
42 import org.apache.hc.core5.http.Header;
43 import org.apache.hc.core5.http.WritableByteChannelMock;
44 import org.apache.hc.core5.http.nio.DataStreamChannel;
45 import org.apache.hc.core5.util.Timeout;
46 import org.junit.Assert;
47 import org.junit.Test;
48 import org.mockito.Mockito;
49
50 public class TestSharedOutputBuffer {
51
52 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
53
54 static class DataStreamChannelMock implements DataStreamChannel {
55
56 private final WritableByteChannelMock channel;
57
58 DataStreamChannelMock(final WritableByteChannelMock channel) {
59 this.channel = channel;
60 }
61
62 @Override
63 public synchronized int write(final ByteBuffer src) throws IOException {
64 return channel.write(src);
65 }
66
67 @Override
68 public synchronized void requestOutput() {
69 notifyAll();
70 }
71
72 @Override
73 public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
74 channel.close();
75 notifyAll();
76 }
77
78 @Override
79 public void endStream() throws IOException {
80 endStream(null);
81 }
82
83 public synchronized void awaitOutputRequest() throws InterruptedException {
84 wait();
85 }
86
87 }
88
89 @Test
90 public void testBasis() throws Exception {
91
92 final Charset charset = StandardCharsets.US_ASCII;
93 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
94
95 final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
96 final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
97 outputBuffer.flush(dataStreamChannel);
98
99 Mockito.verifyZeroInteractions(dataStreamChannel);
100
101 Assert.assertEquals(0, outputBuffer.length());
102 Assert.assertEquals(30, outputBuffer.capacity());
103
104 final byte[] tmp = "1234567890".getBytes(charset);
105 outputBuffer.write(tmp, 0, tmp.length);
106 outputBuffer.write(tmp, 0, tmp.length);
107 outputBuffer.write('1');
108 outputBuffer.write('2');
109
110 Assert.assertEquals(22, outputBuffer.length());
111 Assert.assertEquals(8, outputBuffer.capacity());
112
113 Mockito.verifyZeroInteractions(dataStreamChannel);
114 }
115
116 @Test
117 public void testFlush() throws Exception {
118
119 final Charset charset = StandardCharsets.US_ASCII;
120 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
121
122 final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
123 final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
124 outputBuffer.flush(dataStreamChannel);
125
126 Assert.assertEquals(0, outputBuffer.length());
127 Assert.assertEquals(30, outputBuffer.capacity());
128
129 final byte[] tmp = "1234567890".getBytes(charset);
130 outputBuffer.write(tmp, 0, tmp.length);
131 outputBuffer.write(tmp, 0, tmp.length);
132 outputBuffer.write('1');
133 outputBuffer.write('2');
134
135 outputBuffer.flush(dataStreamChannel);
136
137 Assert.assertEquals(0, outputBuffer.length());
138 Assert.assertEquals(30, outputBuffer.capacity());
139 }
140
141 @Test
142 public void testMultithreadingWriteStream() throws Exception {
143
144 final Charset charset = StandardCharsets.US_ASCII;
145 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
146
147 final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
148 final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
149
150 final ExecutorService executorService = Executors.newFixedThreadPool(2);
151 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
152
153 @Override
154 public Boolean call() throws Exception {
155 final byte[] tmp = "1234567890".getBytes(charset);
156 outputBuffer.write(tmp, 0, tmp.length);
157 outputBuffer.write(tmp, 0, tmp.length);
158 outputBuffer.write('1');
159 outputBuffer.write('2');
160 outputBuffer.write(tmp, 0, tmp.length);
161 outputBuffer.write(tmp, 0, tmp.length);
162 outputBuffer.write(tmp, 0, tmp.length);
163 outputBuffer.writeCompleted();
164 outputBuffer.writeCompleted();
165 return Boolean.TRUE;
166 }
167
168 });
169 final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
170
171 @Override
172 public Boolean call() throws Exception {
173 for (;;) {
174 outputBuffer.flush(dataStreamChannel);
175 if (outputBuffer.isEndStream()) {
176 break;
177 }
178 if (!outputBuffer.hasData()) {
179 dataStreamChannel.awaitOutputRequest();
180 }
181 }
182 return Boolean.TRUE;
183 }
184
185 });
186
187 Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
188 Assert.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
189
190 Assert.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
191 }
192
193 @Test
194 public void testMultithreadingWriteStreamAbort() throws Exception {
195
196 final Charset charset = StandardCharsets.US_ASCII;
197 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
198
199 final ExecutorService executorService = Executors.newFixedThreadPool(2);
200 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
201
202 @Override
203 public Boolean call() throws Exception {
204 final byte[] tmp = "1234567890".getBytes(charset);
205 for (int i = 0; i < 20; i++) {
206 outputBuffer.write(tmp, 0, tmp.length);
207 }
208 outputBuffer.writeCompleted();
209 return Boolean.TRUE;
210 }
211
212 });
213 final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
214
215 @Override
216 public Boolean call() throws Exception {
217 Thread.sleep(200);
218 outputBuffer.abort();
219 return Boolean.TRUE;
220 }
221
222 });
223
224 Assert.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
225 try {
226 task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
227 } catch (final ExecutionException ex) {
228 Assert.assertTrue(ex.getCause() instanceof InterruptedIOException);
229 }
230 }
231
232 @Test
233 public void testEndStreamOnlyCalledOnce() throws IOException {
234
235 final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
236 final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
237
238 outputBuffer.flush(channel);
239
240 outputBuffer.writeCompleted();
241 outputBuffer.flush(channel);
242
243 Mockito.verify(channel, Mockito.times(1)).endStream();
244 }
245
246 }
247