View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.nio.support.classic;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.nio.charset.Charset;
34  import java.nio.charset.StandardCharsets;
35  import java.util.List;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.Future;
41  
42  import org.apache.hc.core5.http.Header;
43  import org.apache.hc.core5.http.WritableByteChannelMock;
44  import org.apache.hc.core5.http.nio.DataStreamChannel;
45  import org.apache.hc.core5.util.Timeout;
46  import org.junit.Assert;
47  import org.junit.Test;
48  import org.mockito.Mockito;
49  
50  public class TestSharedOutputBuffer {
51  
52      private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
53  
54      static class DataStreamChannelMock implements DataStreamChannel {
55  
56          private final WritableByteChannelMock channel;
57  
58          DataStreamChannelMock(final WritableByteChannelMock channel) {
59              this.channel = channel;
60          }
61  
62          @Override
63          public synchronized int write(final ByteBuffer src) throws IOException {
64              return channel.write(src);
65          }
66  
67          @Override
68          public synchronized  void requestOutput() {
69              notifyAll();
70          }
71  
72          @Override
73          public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
74              channel.close();
75              notifyAll();
76          }
77  
78          @Override
79          public void endStream() throws IOException {
80              endStream(null);
81          }
82  
83          public synchronized void awaitOutputRequest() throws InterruptedException {
84              wait();
85          }
86  
87      }
88  
89      @Test
90      public void testBasis() throws Exception {
91  
92          final Charset charset = StandardCharsets.US_ASCII;
93          final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
94  
95          final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
96          final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
97          outputBuffer.flush(dataStreamChannel);
98  
99          Mockito.verifyZeroInteractions(dataStreamChannel);
100 
101         Assert.assertEquals(0, outputBuffer.length());
102         Assert.assertEquals(30, outputBuffer.capacity());
103 
104         final byte[] tmp = "1234567890".getBytes(charset);
105         outputBuffer.write(tmp, 0, tmp.length);
106         outputBuffer.write(tmp, 0, tmp.length);
107         outputBuffer.write('1');
108         outputBuffer.write('2');
109 
110         Assert.assertEquals(22, outputBuffer.length());
111         Assert.assertEquals(8, outputBuffer.capacity());
112 
113         Mockito.verifyZeroInteractions(dataStreamChannel);
114     }
115 
116     @Test
117     public void testFlush() throws Exception {
118 
119         final Charset charset = StandardCharsets.US_ASCII;
120         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
121 
122         final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
123         final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
124         outputBuffer.flush(dataStreamChannel);
125 
126         Assert.assertEquals(0, outputBuffer.length());
127         Assert.assertEquals(30, outputBuffer.capacity());
128 
129         final byte[] tmp = "1234567890".getBytes(charset);
130         outputBuffer.write(tmp, 0, tmp.length);
131         outputBuffer.write(tmp, 0, tmp.length);
132         outputBuffer.write('1');
133         outputBuffer.write('2');
134 
135         outputBuffer.flush(dataStreamChannel);
136 
137         Assert.assertEquals(0, outputBuffer.length());
138         Assert.assertEquals(30, outputBuffer.capacity());
139     }
140 
141     @Test
142     public void testMultithreadingWriteStream() throws Exception {
143 
144         final Charset charset = StandardCharsets.US_ASCII;
145         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
146 
147         final WritableByteChannelMockyteChannelMock.html#WritableByteChannelMock">WritableByteChannelMock channel = new WritableByteChannelMock(1024);
148         final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
149 
150         final ExecutorService executorService = Executors.newFixedThreadPool(2);
151         final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
152 
153             @Override
154             public Boolean call() throws Exception {
155                 final byte[] tmp = "1234567890".getBytes(charset);
156                 outputBuffer.write(tmp, 0, tmp.length);
157                 outputBuffer.write(tmp, 0, tmp.length);
158                 outputBuffer.write('1');
159                 outputBuffer.write('2');
160                 outputBuffer.write(tmp, 0, tmp.length);
161                 outputBuffer.write(tmp, 0, tmp.length);
162                 outputBuffer.write(tmp, 0, tmp.length);
163                 outputBuffer.writeCompleted();
164                 outputBuffer.writeCompleted();
165                 return Boolean.TRUE;
166             }
167 
168         });
169         final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
170 
171             @Override
172             public Boolean call() throws Exception {
173                 for (;;) {
174                     outputBuffer.flush(dataStreamChannel);
175                     if (outputBuffer.isEndStream()) {
176                         break;
177                     }
178                     if (!outputBuffer.hasData()) {
179                         dataStreamChannel.awaitOutputRequest();
180                     }
181                 }
182                 return Boolean.TRUE;
183             }
184 
185         });
186 
187         Assert.assertEquals(Boolean.TRUE, task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
188         Assert.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
189 
190         Assert.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
191     }
192 
193     @Test
194     public void testMultithreadingWriteStreamAbort() throws Exception {
195 
196         final Charset charset = StandardCharsets.US_ASCII;
197         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
198 
199         final ExecutorService executorService = Executors.newFixedThreadPool(2);
200         final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
201 
202             @Override
203             public Boolean call() throws Exception {
204                 final byte[] tmp = "1234567890".getBytes(charset);
205                 for (int i = 0; i < 20; i++) {
206                     outputBuffer.write(tmp, 0, tmp.length);
207                 }
208                 outputBuffer.writeCompleted();
209                 return Boolean.TRUE;
210             }
211 
212         });
213         final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
214 
215             @Override
216             public Boolean call() throws Exception {
217                 Thread.sleep(200);
218                 outputBuffer.abort();
219                 return Boolean.TRUE;
220             }
221 
222         });
223 
224         Assert.assertEquals(Boolean.TRUE, task2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
225         try {
226             task1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
227         } catch (final ExecutionException ex) {
228             Assert.assertTrue(ex.getCause() instanceof InterruptedIOException);
229         }
230     }
231 
232     @Test
233     public void testEndStreamOnlyCalledOnce() throws IOException {
234 
235         final DataStreamChannel channel = Mockito.mock(DataStreamChannel.class);
236         final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
237 
238         outputBuffer.flush(channel);
239 
240         outputBuffer.writeCompleted();
241         outputBuffer.flush(channel);
242 
243         Mockito.verify(channel, Mockito.times(1)).endStream();
244     }
245 
246 }
247