View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support.classic;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.hc.core5.annotation.Contract;
35  import org.apache.hc.core5.annotation.ThreadingBehavior;
36  import org.apache.hc.core5.http.nio.CapacityChannel;
37  
38  /**
39   * @since 5.0
40   */
41  @Contract(threading = ThreadingBehavior.SAFE)
42  public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
43  
44      private volatile CapacityChannel capacityChannel;
45  
46      public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
47          super(lock, initialBufferSize);
48      }
49  
50      public SharedInputBuffer(final int bufferSize) {
51          super(new ReentrantLock(), bufferSize);
52      }
53  
54      public int fill(final ByteBuffer src) {
55          lock.lock();
56          try {
57              setInputMode();
58              ensureAdjustedCapacity(buffer().position() + src.remaining());
59              buffer().put(src);
60              final int remaining = buffer().remaining();
61              condition.signalAll();
62              return remaining;
63          } finally {
64              lock.unlock();
65          }
66      }
67  
68      public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
69          lock.lock();
70          try {
71              this.capacityChannel = capacityChannel;
72              setInputMode();
73              if (buffer().hasRemaining()) {
74                  capacityChannel.update(buffer().remaining());
75              }
76          } finally {
77              lock.unlock();
78          }
79      }
80  
81      private void awaitInput() throws InterruptedIOException {
82          if (!buffer().hasRemaining()) {
83              setInputMode();
84              while (buffer().position() == 0 && !endStream && !aborted) {
85                  try {
86                      condition.await();
87                  } catch (final InterruptedException ex) {
88                      Thread.currentThread().interrupt();
89                      throw new InterruptedIOException(ex.getMessage());
90                  }
91              }
92              setOutputMode();
93          }
94      }
95  
96      @Override
97      public int read() throws IOException {
98          lock.lock();
99          try {
100             setOutputMode();
101             awaitInput();
102             if (aborted) {
103                 return -1;
104             }
105             if (!buffer().hasRemaining() && endStream) {
106                 return -1;
107             }
108             final int b = buffer().get() & 0xff;
109             if (!buffer().hasRemaining() && capacityChannel != null) {
110                 setInputMode();
111                 if (buffer().hasRemaining()) {
112                     capacityChannel.update(buffer().remaining());
113                 }
114             }
115             return b;
116         } finally {
117             lock.unlock();
118         }
119     }
120 
121     @Override
122     public int read(final byte[] b, final int off, final int len) throws IOException {
123         lock.lock();
124         try {
125             setOutputMode();
126             awaitInput();
127             if (aborted) {
128                 return -1;
129             }
130             if (!buffer().hasRemaining() && endStream) {
131                 return -1;
132             }
133             final int chunk = Math.min(buffer().remaining(), len);
134             buffer().get(b, off, chunk);
135             if (!buffer().hasRemaining() && capacityChannel != null) {
136                 setInputMode();
137                 if (buffer().hasRemaining()) {
138                     capacityChannel.update(buffer().remaining());
139                 }
140             }
141             return chunk;
142         } finally {
143             lock.unlock();
144         }
145     }
146 
147     public void markEndStream() {
148         if (endStream) {
149             return;
150         }
151         lock.lock();
152         try {
153             if (!endStream) {
154                 endStream = true;
155                 capacityChannel = null;
156                 condition.signalAll();
157             }
158         } finally {
159             lock.unlock();
160         }
161     }
162 
163 }