1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support.classic;
29
30 import java.nio.ByteBuffer;
31 import java.nio.charset.Charset;
32 import java.nio.charset.StandardCharsets;
33 import java.util.Random;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38
39 import org.apache.hc.core5.http.nio.CapacityChannel;
40 import org.apache.hc.core5.util.Timeout;
41 import org.junit.Assert;
42 import org.junit.Test;
43 import org.mockito.ArgumentMatchers;
44 import org.mockito.Mockito;
45
46 public class TestSharedInputBuffer {
47
48 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
49
50 @Test
51 public void testBasis() throws Exception {
52
53 final Charset charset = StandardCharsets.US_ASCII;
54 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
55 inputBuffer.fill(charset.encode("1234567890"));
56 Assert.assertEquals(10, inputBuffer.length());
57
58 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
59
60 inputBuffer.updateCapacity(capacityChannel);
61 Mockito.verifyZeroInteractions(capacityChannel);
62
63 inputBuffer.fill(charset.encode("1234567890"));
64 inputBuffer.fill(charset.encode("1234567890"));
65 Assert.assertEquals(30, inputBuffer.length());
66
67 Mockito.verifyZeroInteractions(capacityChannel);
68
69 final byte[] tmp = new byte[20];
70 final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
71 Assert.assertEquals(20, bytesRead1);
72 Mockito.verifyZeroInteractions(capacityChannel);
73
74 inputBuffer.markEndStream();
75
76 Assert.assertEquals('1', inputBuffer.read());
77 Assert.assertEquals('2', inputBuffer.read());
78 final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
79 Assert.assertEquals(8, bytesRead2);
80 Mockito.verifyZeroInteractions(capacityChannel);
81 Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
82 Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
83 Assert.assertEquals(-1, inputBuffer.read());
84 Assert.assertEquals(-1, inputBuffer.read());
85 }
86
87 @Test
88 public void testMultithreadingRead() throws Exception {
89
90 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
91
92 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
93
94 inputBuffer.updateCapacity(capacityChannel);
95 Mockito.verify(capacityChannel).update(10);
96 Mockito.reset(capacityChannel);
97
98 final ExecutorService executorService = Executors.newFixedThreadPool(2);
99 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
100
101 @Override
102 public Boolean call() throws Exception {
103 final Charset charset = StandardCharsets.US_ASCII;
104 inputBuffer.fill(charset.encode("1234567890"));
105 return Boolean.TRUE;
106 }
107
108 });
109 final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
110
111 @Override
112 public Integer call() throws Exception {
113 final byte[] tmp = new byte[20];
114 return inputBuffer.read(tmp, 0, tmp.length);
115 }
116
117 });
118
119 Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
120 Assert.assertEquals(Integer.valueOf(10), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
121 Mockito.verify(capacityChannel).update(10);
122 }
123
124 @Test
125 public void testMultithreadingSingleRead() throws Exception {
126
127 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
128
129 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
130
131 inputBuffer.updateCapacity(capacityChannel);
132 Mockito.verify(capacityChannel).update(10);
133 Mockito.reset(capacityChannel);
134
135 final ExecutorService executorService = Executors.newFixedThreadPool(2);
136 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
137
138 @Override
139 public Boolean call() throws Exception {
140 final Charset charset = StandardCharsets.US_ASCII;
141 inputBuffer.fill(charset.encode("a"));
142 return Boolean.TRUE;
143 }
144
145 });
146 final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
147
148 @Override
149 public Integer call() throws Exception {
150 return inputBuffer.read();
151 }
152
153 });
154
155 Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
156 Assert.assertEquals(Integer.valueOf('a'), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
157 Mockito.verify(capacityChannel).update(10);
158 }
159
160 @Test
161 public void testMultithreadingReadStream() throws Exception {
162
163 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
164
165 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
166
167 inputBuffer.updateCapacity(capacityChannel);
168 Mockito.verify(capacityChannel).update(10);
169 Mockito.reset(capacityChannel);
170
171 final ExecutorService executorService = Executors.newFixedThreadPool(2);
172 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
173
174 @Override
175 public Boolean call() throws Exception {
176 final Charset charset = StandardCharsets.US_ASCII;
177 final Random rnd = new Random(System.currentTimeMillis());
178 for (int i = 0; i < 5; i++) {
179 inputBuffer.fill(charset.encode("1234567890"));
180 Thread.sleep(rnd.nextInt(250));
181 }
182 inputBuffer.markEndStream();
183 return Boolean.TRUE;
184 }
185
186 });
187 final Future<String> task2 = executorService.submit(new Callable<String>() {
188
189 @Override
190 public String call() throws Exception {
191 final Charset charset = StandardCharsets.US_ASCII;
192 final StringBuilder buf = new StringBuilder();
193 final byte[] tmp = new byte[10];
194 int l;
195 while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
196 buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
197 }
198 return buf.toString();
199 }
200
201 });
202
203 Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
204 Assert.assertEquals("12345678901234567890123456789012345678901234567890",
205 task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
206 Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
207 }
208
209 @Test
210 public void testMultithreadingReadStreamAbort() throws Exception {
211
212 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
213
214 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
215
216 inputBuffer.updateCapacity(capacityChannel);
217 Mockito.verify(capacityChannel).update(10);
218 Mockito.reset(capacityChannel);
219
220 final ExecutorService executorService = Executors.newFixedThreadPool(2);
221 final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
222
223 @Override
224 public Boolean call() throws Exception {
225 Thread.sleep(1000);
226 inputBuffer.abort();
227 return Boolean.TRUE;
228 }
229
230 });
231 final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
232
233 @Override
234 public Integer call() throws Exception {
235 return inputBuffer.read();
236 }
237
238 });
239
240 Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
241 Assert.assertEquals(Integer.valueOf(-1), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
242 Mockito.verify(capacityChannel, Mockito.never()).update(10);
243 }
244
245 }
246