View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.util;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.locks.Condition;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.http.annotation.ThreadingBehavior;
35  import org.apache.http.annotation.Contract;
36  import org.apache.http.nio.ContentDecoder;
37  import org.apache.http.nio.IOControl;
38  
39  /**
40   * Implementation of the {@link ContentInputBuffer} interface that can be
41   * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
42   * a worker thread.
43   * <p>
44   * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
45   *   by calling {@link #consumeContent(ContentDecoder)}.
46   * <p>
47   * The worker thread is expected to read the data from the buffer by calling
48   *   {@link #read()} or {@link #read(byte[], int, int)} methods.
49   * <p>
50   * In case of an abnormal situation or when no longer needed the buffer must be shut down
51   * using {@link #shutdown()} method.
52   *
53   * @since 4.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
56  public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
57  
58      private final ReentrantLock lock;
59      private final Condition condition;
60  
61      private volatile IOControl ioControl;
62      private volatile boolean shutdown = false;
63      private volatile boolean endOfStream = false;
64  
65      /**
66       * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)}
67       */
68      @Deprecated
69      public SharedInputBuffer(final int bufferSize, final IOControl ioControl, final ByteBufferAllocator allocator) {
70          super(bufferSize, allocator);
71          this.ioControl = ioControl;
72          this.lock = new ReentrantLock();
73          this.condition = this.lock.newCondition();
74      }
75  
76      /**
77       * @since 4.3
78       */
79      public SharedInputBuffer(final int bufferSize, final ByteBufferAllocator allocator) {
80          super(bufferSize, allocator);
81          this.lock = new ReentrantLock();
82          this.condition = this.lock.newCondition();
83      }
84  
85      /**
86       * @since 4.3
87       */
88      public SharedInputBuffer(final int bufferSize) {
89          this(bufferSize, HeapByteBufferAllocator.INSTANCE);
90      }
91  
92      @Override
93      public void reset() {
94          if (this.shutdown) {
95              return;
96          }
97          this.lock.lock();
98          try {
99              clear();
100             this.endOfStream = false;
101         } finally {
102             this.lock.unlock();
103         }
104     }
105 
106     /**
107      * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)}
108      */
109     @Override
110     @Deprecated
111     public int consumeContent(final ContentDecoder decoder) throws IOException {
112         return consumeContent(decoder, null);
113     }
114 
115     /**
116      * @since 4.3
117      */
118     public int consumeContent(final ContentDecoder decoder, final IOControl ioControl) throws IOException {
119         if (this.shutdown) {
120             return -1;
121         }
122         this.lock.lock();
123         try {
124             if (ioControl != null) {
125                 this.ioControl = ioControl;
126             }
127             setInputMode();
128             int totalRead = 0;
129             int bytesRead;
130             while ((bytesRead = decoder.read(this.buffer)) > 0) {
131                 totalRead += bytesRead;
132             }
133             if (bytesRead == -1 || decoder.isCompleted()) {
134                 this.endOfStream = true;
135             }
136             if (!this.buffer.hasRemaining()) {
137                 if (this.ioControl != null) {
138                     this.ioControl.suspendInput();
139                 }
140             }
141             this.condition.signalAll();
142 
143             if (totalRead > 0) {
144                 return totalRead;
145             }
146             return this.endOfStream ? -1 : 0;
147         } finally {
148             this.lock.unlock();
149         }
150     }
151 
152     @Override
153     public boolean hasData() {
154         this.lock.lock();
155         try {
156             return super.hasData();
157         } finally {
158             this.lock.unlock();
159         }
160     }
161 
162     @Override
163     public int available() {
164         this.lock.lock();
165         try {
166             return super.available();
167         } finally {
168             this.lock.unlock();
169         }
170     }
171 
172     @Override
173     public int capacity() {
174         this.lock.lock();
175         try {
176             return super.capacity();
177         } finally {
178             this.lock.unlock();
179         }
180     }
181 
182     @Override
183     public int length() {
184         this.lock.lock();
185         try {
186             return super.length();
187         } finally {
188             this.lock.unlock();
189         }
190     }
191 
192     protected void waitForData() throws IOException {
193         this.lock.lock();
194         try {
195             try {
196                 while (!super.hasData() && !this.endOfStream) {
197                     if (this.shutdown) {
198                         throw new InterruptedIOException("Input operation aborted");
199                     }
200                     if (this.ioControl != null) {
201                         this.ioControl.requestInput();
202                     }
203                     this.condition.await();
204                 }
205             } catch (final InterruptedException ex) {
206                 throw new IOException("Interrupted while waiting for more data");
207             }
208         } finally {
209             this.lock.unlock();
210         }
211     }
212 
213     public void close() {
214         if (this.shutdown) {
215             return;
216         }
217         this.endOfStream = true;
218         this.lock.lock();
219         try {
220             this.condition.signalAll();
221         } finally {
222             this.lock.unlock();
223         }
224     }
225 
226     public void shutdown() {
227         if (this.shutdown) {
228             return;
229         }
230         this.shutdown = true;
231         this.lock.lock();
232         try {
233             this.condition.signalAll();
234         } finally {
235             this.lock.unlock();
236         }
237     }
238 
239     protected boolean isShutdown() {
240         return this.shutdown;
241     }
242 
243     protected boolean isEndOfStream() {
244         return this.shutdown || (!hasData() && this.endOfStream);
245     }
246 
247     @Override
248     public int read() throws IOException {
249         if (this.shutdown) {
250             return -1;
251         }
252         this.lock.lock();
253         try {
254             if (!hasData()) {
255                 waitForData();
256             }
257             if (isEndOfStream()) {
258                 return -1;
259             }
260             return this.buffer.get() & 0xff;
261         } finally {
262             this.lock.unlock();
263         }
264     }
265 
266     @Override
267     public int read(final byte[] b, final int off, final int len) throws IOException {
268         if (this.shutdown) {
269             return -1;
270         }
271         if (b == null) {
272             return 0;
273         }
274         this.lock.lock();
275         try {
276             if (!hasData()) {
277                 waitForData();
278             }
279             if (isEndOfStream()) {
280                 return -1;
281             }
282             setOutputMode();
283             int chunk = len;
284             if (chunk > this.buffer.remaining()) {
285                 chunk = this.buffer.remaining();
286             }
287             this.buffer.get(b, off, chunk);
288             return chunk;
289         } finally {
290             this.lock.unlock();
291         }
292     }
293 
294     public int read(final byte[] b) throws IOException {
295         if (this.shutdown) {
296             return -1;
297         }
298         if (b == null) {
299             return 0;
300         }
301         return read(b, 0, b.length);
302     }
303 
304 }