View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.nio.support.classic;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.nio.charset.Charset;
34  import java.nio.charset.StandardCharsets;
35  import java.util.List;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  
41  import org.apache.hc.core5.http.Header;
42  import org.apache.hc.core5.http.WritableByteChannelMock;
43  import org.apache.hc.core5.http.nio.DataStreamChannel;
44  import org.apache.hc.core5.util.Timeout;
45  import org.junit.jupiter.api.Assertions;
46  import org.junit.jupiter.api.Test;
47  import org.mockito.Mockito;
48  
49  public class TestSharedOutputBuffer {
50  
51      private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
52  
53      static class DataStreamChannelMock implements DataStreamChannel {
54  
55          private final WritableByteChannelMock channel;
56  
57          DataStreamChannelMock(final WritableByteChannelMock channel) {
58              this.channel = channel;
59          }
60  
61          @Override
62          public synchronized int write(final ByteBuffer src) throws IOException {
63              return channel.write(src);
64          }
65  
66          @Override
67          public synchronized  void requestOutput() {
68              notifyAll();
69          }
70  
71          @Override
72          public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
73              channel.close();
74              notifyAll();
75          }
76  
77          @Override
78          public void endStream() throws IOException {
79              endStream(null);
80          }
81  
82          public synchronized void awaitOutputRequest() throws InterruptedException {
83              wait();
84          }
85  
86      }
87  
88      @Test
89      public void testBasis() throws Exception {
90  
91          final Charset charset = StandardCharsets.US_ASCII;
92          final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
93  
94          final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
95          final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
96          outputBuffer.flush(dataStreamChannel);
97  
98          Mockito.verifyNoInteractions(dataStreamChannel);
99  
100         Assertions.assertEquals(0, outputBuffer.length());
101         Assertions.assertEquals(30, outputBuffer.capacity());
102 
103         final byte[] tmp = "1234567890".getBytes(charset);
104         outputBuffer.write(tmp, 0, tmp.length);
105         outputBuffer.write(tmp, 0, tmp.length);
106         outputBuffer.write('1');
107         outputBuffer.write('2');
108 
109         Assertions.assertEquals(22, outputBuffer.length());
110         Assertions.assertEquals(8, outputBuffer.capacity());
111 
112         Mockito.verifyNoInteractions(dataStreamChannel);
113     }
114 
115     @Test
116     public void testFlush() throws Exception {
117 
118         final Charset charset = StandardCharsets.US_ASCII;
119         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
120 
121         final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
122         final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
123         outputBuffer.flush(dataStreamChannel);
124 
125         Assertions.assertEquals(0, outputBuffer.length());
126         Assertions.assertEquals(30, outputBuffer.capacity());
127 
128         final byte[] tmp = "1234567890".getBytes(charset);
129         outputBuffer.write(tmp, 0, tmp.length);
130         outputBuffer.write(tmp, 0, tmp.length);
131         outputBuffer.write('1');
132         outputBuffer.write('2');
133 
134         outputBuffer.flush(dataStreamChannel);
135 
136         Assertions.assertEquals(0, outputBuffer.length());
137         Assertions.assertEquals(30, outputBuffer.capacity());
138     }
139 
140     @Test
141     public void testMultithreadingWriteStream() throws Exception {
142 
143         final Charset charset = StandardCharsets.US_ASCII;
144         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
145 
146         final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
147         final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
148 
149         final ExecutorService executorService = Executors.newFixedThreadPool(2);
150         final Future<Boolean> task1 = executorService.submit(() -> {
151             final byte[] tmp = "1234567890".getBytes(charset);
152             outputBuffer.write(tmp, 0, tmp.length);
153             outputBuffer.write(tmp, 0, tmp.length);
154             outputBuffer.write('1');
155             outputBuffer.write('2');
156             outputBuffer.write(tmp, 0, tmp.length);
157             outputBuffer.write(tmp, 0, tmp.length);
158             outputBuffer.write(tmp, 0, tmp.length);
159             outputBuffer.writeCompleted();
160             outputBuffer.writeCompleted();
161             return Boolean.TRUE;
162         });
163         final Future<Boolean> task2 = executorService.submit(() -> {
164             for (;;) {
165                 outputBuffer.flush(dataStreamChannel);
166                 if (outputBuffer.isEndStream()) {
167                     break;
168                 }
169                 if (!outputBuffer.hasData()) {
170                     dataStreamChannel.awaitOutputRequest();
171                 }
172             }
173             return Boolean.TRUE;
174         });
175 
176         Assertions.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
177         Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
178 
179         Assertions.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
180     }
181 
182     @Test
183     public void testMultithreadingWriteStreamAbort() throws Exception {
184 
185         final Charset charset = StandardCharsets.US_ASCII;
186         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
187 
188         final ExecutorService executorService = Executors.newFixedThreadPool(2);
189         final Future<Boolean> task1 = executorService.submit(() -> {
190             final byte[] tmp = "1234567890".getBytes(charset);
191             for (int i = 0; i < 20; i++) {
192                 outputBuffer.write(tmp, 0, tmp.length);
193             }
194             outputBuffer.writeCompleted();
195             return Boolean.TRUE;
196         });
197         final Future<Boolean> task2 = executorService.submit(() -> {
198             Thread.sleep(200);
199             outputBuffer.abort();
200             return Boolean.TRUE;
201         });
202 
203         Assertions.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
204         try {
205             task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
206         } catch (final ExecutionException ex) {
207             Assertions.assertTrue(ex.getCause() instanceof InterruptedIOException);
208         }
209     }
210 
211     @Test
212     public void testEndStreamOnlyCalledOnce() throws IOException {
213 
214         final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
215         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
216 
217         outputBuffer.flush(channel);
218 
219         outputBuffer.writeCompleted();
220         outputBuffer.flush(channel);
221 
222         Mockito.verify(channel, Mockito.times(1)).endStream();
223     }
224 
225 }
226