1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support.classic;
29
30 import java.nio.ByteBuffer;
31 import java.nio.charset.Charset;
32 import java.nio.charset.StandardCharsets;
33 import java.util.Random;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38
39 import org.apache.hc.core5.http.nio.CapacityChannel;
40 import org.apache.hc.core5.util.Timeout;
41 import org.junit.jupiter.api.Assertions;
42 import org.junit.jupiter.api.Test;
43 import org.mockito.ArgumentMatchers;
44 import org.mockito.Mockito;
45
46 public class TestSharedInputBuffer {
47
48 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
49
50 @Test
51 public void testBasis() throws Exception {
52
53 final Charset charset = StandardCharsets.US_ASCII;
54 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
55 inputBuffer.fill(charset.encode("1234567890"));
56 Assertions.assertEquals(10, inputBuffer.length());
57
58 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
59
60 inputBuffer.updateCapacity(capacityChannel);
61 Mockito.verifyNoInteractions(capacityChannel);
62
63 inputBuffer.fill(charset.encode("1234567890"));
64 inputBuffer.fill(charset.encode("1234567890"));
65 Assertions.assertEquals(30, inputBuffer.length());
66
67 Mockito.verifyNoInteractions(capacityChannel);
68
69 final byte[] tmp = new byte[20];
70 final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
71 Assertions.assertEquals(20, bytesRead1);
72 Mockito.verifyNoInteractions(capacityChannel);
73
74 inputBuffer.markEndStream();
75
76 Assertions.assertEquals('1', inputBuffer.read());
77 Assertions.assertEquals('2', inputBuffer.read());
78 final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
79 Assertions.assertEquals(8, bytesRead2);
80 Mockito.verifyNoInteractions(capacityChannel);
81 Assertions.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
82 Assertions.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
83 Assertions.assertEquals(-1, inputBuffer.read());
84 Assertions.assertEquals(-1, inputBuffer.read());
85 }
86
87 @Test
88 public void testMultithreadingRead() throws Exception {
89
90 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
91
92 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
93
94 inputBuffer.updateCapacity(capacityChannel);
95 Mockito.verify(capacityChannel).update(10);
96 Mockito.reset(capacityChannel);
97
98 final ExecutorService executorService = Executors.newFixedThreadPool(2);
99 final Future<Boolean> task1 = executorService.submit(() -> {
100 final Charset charset = StandardCharsets.US_ASCII;
101 inputBuffer.fill(charset.encode("1234567890"));
102 return Boolean.TRUE;
103 });
104 final Future<Integer> task2 = executorService.submit(() -> {
105 final byte[] tmp = new byte[20];
106 return inputBuffer.read(tmp, 0, tmp.length);
107 });
108
109 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
110 Assertions.assertEquals(Integer.valueOf(10), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
111 Mockito.verify(capacityChannel).update(10);
112 }
113
114 @Test
115 public void testMultithreadingSingleRead() throws Exception {
116
117 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
118
119 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
120
121 inputBuffer.updateCapacity(capacityChannel);
122 Mockito.verify(capacityChannel).update(10);
123 Mockito.reset(capacityChannel);
124
125 final ExecutorService executorService = Executors.newFixedThreadPool(2);
126 final Future<Boolean> task1 = executorService.submit(() -> {
127 final Charset charset = StandardCharsets.US_ASCII;
128 inputBuffer.fill(charset.encode("a"));
129 return Boolean.TRUE;
130 });
131 final Future<Integer> task2 = executorService.submit((Callable<Integer>) inputBuffer::read);
132
133 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
134 Assertions.assertEquals(Integer.valueOf('a'), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
135 Mockito.verify(capacityChannel).update(1);
136 }
137
138 @Test
139 public void testMultithreadingReadStream() throws Exception {
140
141 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
142
143 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
144
145 inputBuffer.updateCapacity(capacityChannel);
146 Mockito.verify(capacityChannel).update(10);
147 Mockito.reset(capacityChannel);
148
149 final ExecutorService executorService = Executors.newFixedThreadPool(2);
150 final Future<Boolean> task1 = executorService.submit(() -> {
151 final Charset charset = StandardCharsets.US_ASCII;
152 final Random rnd = new Random(System.currentTimeMillis());
153 for (int i = 0; i < 5; i++) {
154 inputBuffer.fill(charset.encode("1234567890"));
155 Thread.sleep(rnd.nextInt(250));
156 }
157 inputBuffer.markEndStream();
158 return Boolean.TRUE;
159 });
160 final Future<String> task2 = executorService.submit(() -> {
161 final Charset charset = StandardCharsets.US_ASCII;
162 final StringBuilder buf = new StringBuilder();
163 final byte[] tmp = new byte[10];
164 int l;
165 while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
166 buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
167 }
168 return buf.toString();
169 });
170
171 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
172 Assertions.assertEquals("12345678901234567890123456789012345678901234567890",
173 task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
174 Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
175 }
176
177 @Test
178 public void testMultithreadingReadStreamAbort() throws Exception {
179
180 final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
181
182 final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
183
184 inputBuffer.updateCapacity(capacityChannel);
185 Mockito.verify(capacityChannel).update(10);
186 Mockito.reset(capacityChannel);
187
188 final ExecutorService executorService = Executors.newFixedThreadPool(2);
189 final Future<Boolean> task1 = executorService.submit(() -> {
190 Thread.sleep(1000);
191 inputBuffer.abort();
192 return Boolean.TRUE;
193 });
194 final Future<Integer> task2 = executorService.submit((Callable<Integer>) inputBuffer::read);
195
196 Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
197 Assertions.assertEquals(Integer.valueOf(-1), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
198 Mockito.verify(capacityChannel, Mockito.never()).update(10);
199 }
200
201 }
202