View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support.classic;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.hc.core5.annotation.Contract;
35  import org.apache.hc.core5.annotation.ThreadingBehavior;
36  import org.apache.hc.core5.http.nio.DataStreamChannel;
37  
38  /**
39   * @since 5.0
40   */
41  @Contract(threading = ThreadingBehavior.SAFE)
42  public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
43  
44      private volatile DataStreamChannel dataStreamChannel;
45      private volatile boolean hasCapacity;
46      private volatile boolean endStreamPropagated;
47  
48      public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
49          super(lock, initialBufferSize);
50          this.hasCapacity = false;
51          this.endStreamPropagated = false;
52      }
53  
54      public SharedOutputBuffer(final int bufferSize) {
55          this(new ReentrantLock(), bufferSize);
56      }
57  
58      public void flush(final DataStreamChannel channel) throws IOException {
59          lock.lock();
60          try {
61              dataStreamChannel = channel;
62              hasCapacity = true;
63              setOutputMode();
64              if (buffer().hasRemaining()) {
65                  dataStreamChannel.write(buffer());
66              }
67              if (!buffer().hasRemaining() && endStream) {
68                  propagateEndStream();
69              }
70              condition.signalAll();
71          } finally {
72              lock.unlock();
73          }
74      }
75  
76      private void ensureNotAborted() throws InterruptedIOException {
77          if (aborted) {
78              throw new InterruptedIOException("Operation aborted");
79          }
80      }
81  
82      @Override
83      public void write(final byte[] b, final int off, final int len) throws IOException {
84          final ByteBuffer src = ByteBuffer.wrap(b, off, len);
85          lock.lock();
86          try {
87              ensureNotAborted();
88              setInputMode();
89              while (src.hasRemaining()) {
90                  // always buffer small chunks
91                  if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
92                      buffer().put(src);
93                  } else {
94                      if (buffer().position() > 0 || dataStreamChannel == null) {
95                          waitFlush();
96                      }
97                      if (buffer().position() == 0 && dataStreamChannel != null) {
98                          final int bytesWritten = dataStreamChannel.write(src);
99                          if (bytesWritten == 0) {
100                             hasCapacity = false;
101                             waitFlush();
102                         }
103                     }
104                 }
105             }
106         } finally {
107             lock.unlock();
108         }
109     }
110 
111     @Override
112     public void write(final int b) throws IOException {
113         lock.lock();
114         try {
115             ensureNotAborted();
116             setInputMode();
117             if (!buffer().hasRemaining()) {
118                 waitFlush();
119             }
120             buffer().put((byte)b);
121         } finally {
122             lock.unlock();
123         }
124     }
125 
126     @Override
127     public void writeCompleted() throws IOException {
128         if (endStream) {
129             return;
130         }
131         lock.lock();
132         try {
133             if (!endStream) {
134                 endStream = true;
135                 if (dataStreamChannel != null) {
136                     setOutputMode();
137                     if (buffer().hasRemaining()) {
138                         dataStreamChannel.requestOutput();
139                     } else {
140                         propagateEndStream();
141                     }
142                 }
143             }
144         } finally {
145             lock.unlock();
146         }
147     }
148 
149     private void waitFlush() throws InterruptedIOException {
150         setOutputMode();
151         if (dataStreamChannel != null) {
152             dataStreamChannel.requestOutput();
153         }
154         ensureNotAborted();
155         while (buffer().hasRemaining() || !hasCapacity) {
156             try {
157                 condition.await();
158             } catch (final InterruptedException ex) {
159                 Thread.currentThread().interrupt();
160                 throw new InterruptedIOException(ex.getMessage());
161             }
162             ensureNotAborted();
163         }
164         setInputMode();
165     }
166 
167     private void propagateEndStream() throws IOException {
168         if (!endStreamPropagated) {
169             dataStreamChannel.endStream();
170             endStreamPropagated = true;
171         }
172     }
173 
174 }