View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.nio.support.classic;
29  
30  import java.nio.ByteBuffer;
31  import java.nio.charset.Charset;
32  import java.nio.charset.StandardCharsets;
33  import java.util.Random;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Future;
38  
39  import org.apache.hc.core5.http.nio.CapacityChannel;
40  import org.apache.hc.core5.util.Timeout;
41  import org.junit.jupiter.api.Assertions;
42  import org.junit.jupiter.api.Test;
43  import org.mockito.ArgumentMatchers;
44  import org.mockito.Mockito;
45  
46  public class TestSharedInputBuffer {
47  
48      private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
49  
50      @Test
51      public void testBasis() throws Exception {
52  
53          final Charset charset = StandardCharsets.US_ASCII;
54          final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
55          inputBuffer.fill(charset.encode("1234567890"));
56          Assertions.assertEquals(10, inputBuffer.length());
57  
58          final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
59  
60          inputBuffer.updateCapacity(capacityChannel);
61          Mockito.verifyNoInteractions(capacityChannel);
62  
63          inputBuffer.fill(charset.encode("1234567890"));
64          inputBuffer.fill(charset.encode("1234567890"));
65          Assertions.assertEquals(30, inputBuffer.length());
66  
67          Mockito.verifyNoInteractions(capacityChannel);
68  
69          final byte[] tmp = new byte[20];
70          final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
71          Assertions.assertEquals(20, bytesRead1);
72          Mockito.verifyNoInteractions(capacityChannel);
73  
74          inputBuffer.markEndStream();
75  
76          Assertions.assertEquals('1', inputBuffer.read());
77          Assertions.assertEquals('2', inputBuffer.read());
78          final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
79          Assertions.assertEquals(8, bytesRead2);
80          Mockito.verifyNoInteractions(capacityChannel);
81          Assertions.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
82          Assertions.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
83          Assertions.assertEquals(-1, inputBuffer.read());
84          Assertions.assertEquals(-1, inputBuffer.read());
85      }
86  
87      @Test
88      public void testMultithreadingRead() throws Exception {
89  
90          final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
91  
92          final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
93  
94          inputBuffer.updateCapacity(capacityChannel);
95          Mockito.verify(capacityChannel).update(10);
96          Mockito.reset(capacityChannel);
97  
98          final ExecutorService executorService = Executors.newFixedThreadPool(2);
99          final Future<Boolean> task1 = executorService.submit(() -> {
100             final Charset charset = StandardCharsets.US_ASCII;
101             inputBuffer.fill(charset.encode("1234567890"));
102             return Boolean.TRUE;
103         });
104         final Future<Integer> task2 = executorService.submit(() -> {
105             final byte[] tmp = new byte[20];
106             return inputBuffer.read(tmp, 0, tmp.length);
107         });
108 
109         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
110         Assertions.assertEquals(Integer.valueOf(10), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
111         Mockito.verify(capacityChannel).update(10);
112     }
113 
114     @Test
115     public void testMultithreadingSingleRead() throws Exception {
116 
117         final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
118 
119         final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
120 
121         inputBuffer.updateCapacity(capacityChannel);
122         Mockito.verify(capacityChannel).update(10);
123         Mockito.reset(capacityChannel);
124 
125         final ExecutorService executorService = Executors.newFixedThreadPool(2);
126         final Future<Boolean> task1 = executorService.submit(() -> {
127             final Charset charset = StandardCharsets.US_ASCII;
128             inputBuffer.fill(charset.encode("a"));
129             return Boolean.TRUE;
130         });
131         final Future<Integer> task2 = executorService.submit((Callable<Integer>) inputBuffer::read);
132 
133         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
134         Assertions.assertEquals(Integer.valueOf('a'), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
135         Mockito.verify(capacityChannel).update(1);
136     }
137 
138     @Test
139     public void testMultithreadingReadStream() throws Exception {
140 
141         final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
142 
143         final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
144 
145         inputBuffer.updateCapacity(capacityChannel);
146         Mockito.verify(capacityChannel).update(10);
147         Mockito.reset(capacityChannel);
148 
149         final ExecutorService executorService = Executors.newFixedThreadPool(2);
150         final Future<Boolean> task1 = executorService.submit(() -> {
151             final Charset charset = StandardCharsets.US_ASCII;
152             final Random rnd = new Random(System.currentTimeMillis());
153             for (int i = 0; i < 5; i++) {
154                 inputBuffer.fill(charset.encode("1234567890"));
155                 Thread.sleep(rnd.nextInt(250));
156             }
157             inputBuffer.markEndStream();
158             return Boolean.TRUE;
159         });
160         final Future<String> task2 = executorService.submit(() -> {
161             final Charset charset = StandardCharsets.US_ASCII;
162             final StringBuilder buf = new StringBuilder();
163             final byte[] tmp = new byte[10];
164             int l;
165             while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
166                 buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
167             }
168             return buf.toString();
169         });
170 
171         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
172         Assertions.assertEquals("12345678901234567890123456789012345678901234567890",
173                 task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
174         Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
175     }
176 
177     @Test
178     public void testMultithreadingReadStreamAbort() throws Exception {
179 
180         final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
181 
182         final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
183 
184         inputBuffer.updateCapacity(capacityChannel);
185         Mockito.verify(capacityChannel).update(10);
186         Mockito.reset(capacityChannel);
187 
188         final ExecutorService executorService = Executors.newFixedThreadPool(2);
189         final Future<Boolean> task1 = executorService.submit(() -> {
190             Thread.sleep(1000);
191             inputBuffer.abort();
192             return Boolean.TRUE;
193         });
194         final Future<Integer> task2 = executorService.submit((Callable<Integer>) inputBuffer::read);
195 
196         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
197         Assertions.assertEquals(Integer.valueOf(-1), task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
198         Mockito.verify(capacityChannel, Mockito.never()).update(10);
199     }
200 
201 }
202