View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support.classic;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.locks.ReentrantLock;
34  
35  import org.apache.hc.core5.annotation.Contract;
36  import org.apache.hc.core5.annotation.ThreadingBehavior;
37  import org.apache.hc.core5.http.nio.CapacityChannel;
38  
39  /**
40   * @since 5.0
41   */
42  @Contract(threading = ThreadingBehavior.SAFE)
43  public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
44  
45      private final int initialBufferSize;
46      private final AtomicInteger capacityIncrement;
47  
48      private volatile CapacityChannel capacityChannel;
49  
50      public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
51          super(lock, initialBufferSize);
52          this.initialBufferSize = initialBufferSize;
53          this.capacityIncrement = new AtomicInteger(0);
54      }
55  
56      public SharedInputBuffer(final int bufferSize) {
57          this(new ReentrantLock(), bufferSize);
58      }
59  
60      public int fill(final ByteBuffer src) {
61          lock.lock();
62          try {
63              setInputMode();
64              ensureAdjustedCapacity(buffer().position() + src.remaining());
65              buffer().put(src);
66              final int remaining = buffer().remaining();
67              condition.signalAll();
68              return remaining;
69          } finally {
70              lock.unlock();
71          }
72      }
73  
74      private void incrementCapacity() throws IOException {
75          if (capacityChannel != null) {
76              final int increment = capacityIncrement.getAndSet(0);
77              if (increment > 0) {
78                  capacityChannel.update(increment);
79              }
80          }
81      }
82  
83      public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
84          lock.lock();
85          try {
86              this.capacityChannel = capacityChannel;
87              setInputMode();
88              if (buffer().position() == 0) {
89                  capacityChannel.update(initialBufferSize);
90              }
91          } finally {
92              lock.unlock();
93          }
94      }
95  
96      private void awaitInput() throws InterruptedIOException {
97          if (!buffer().hasRemaining()) {
98              setInputMode();
99              while (buffer().position() == 0 && !endStream && !aborted) {
100                 try {
101                     condition.await();
102                 } catch (final InterruptedException ex) {
103                     Thread.currentThread().interrupt();
104                     throw new InterruptedIOException(ex.getMessage());
105                 }
106             }
107             setOutputMode();
108         }
109     }
110 
111     @Override
112     public int read() throws IOException {
113         lock.lock();
114         try {
115             setOutputMode();
116             awaitInput();
117             if (aborted) {
118                 return -1;
119             }
120             if (!buffer().hasRemaining() && endStream) {
121                 return -1;
122             }
123             final int b = buffer().get() & 0xff;
124             capacityIncrement.incrementAndGet();
125             if (!buffer().hasRemaining()) {
126                 incrementCapacity();
127             }
128             return b;
129         } finally {
130             lock.unlock();
131         }
132     }
133 
134     @Override
135     public int read(final byte[] b, final int off, final int len) throws IOException {
136         lock.lock();
137         try {
138             setOutputMode();
139             awaitInput();
140             if (aborted) {
141                 return -1;
142             }
143             if (!buffer().hasRemaining() && endStream) {
144                 return -1;
145             }
146             final int chunk = Math.min(buffer().remaining(), len);
147             buffer().get(b, off, chunk);
148             capacityIncrement.addAndGet(chunk);
149             if (!buffer().hasRemaining()) {
150                 incrementCapacity();
151             }
152             return chunk;
153         } finally {
154             lock.unlock();
155         }
156     }
157 
158     public void markEndStream() {
159         if (endStream) {
160             return;
161         }
162         lock.lock();
163         try {
164             if (!endStream) {
165                 endStream = true;
166                 capacityChannel = null;
167                 condition.signalAll();
168             }
169         } finally {
170             lock.unlock();
171         }
172     }
173 
174 }