1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.hc.core5.annotation.Contract;
35 import org.apache.hc.core5.annotation.ThreadingBehavior;
36 import org.apache.hc.core5.http.nio.CapacityChannel;
37
38
39
40
41 @Contract(threading = ThreadingBehavior.SAFE)
42 public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
43
44 private volatile CapacityChannel capacityChannel;
45
46 public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
47 super(lock, initialBufferSize);
48 }
49
50 public SharedInputBuffer(final int bufferSize) {
51 super(new ReentrantLock(), bufferSize);
52 }
53
54 public int fill(final ByteBuffer src) {
55 lock.lock();
56 try {
57 setInputMode();
58 ensureAdjustedCapacity(buffer().position() + src.remaining());
59 buffer().put(src);
60 final int remaining = buffer().remaining();
61 condition.signalAll();
62 return remaining;
63 } finally {
64 lock.unlock();
65 }
66 }
67
68 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
69 lock.lock();
70 try {
71 this.capacityChannel = capacityChannel;
72 setInputMode();
73 if (buffer().hasRemaining()) {
74 capacityChannel.update(buffer().remaining());
75 }
76 } finally {
77 lock.unlock();
78 }
79 }
80
81 private void awaitInput() throws InterruptedIOException {
82 if (!buffer().hasRemaining()) {
83 setInputMode();
84 while (buffer().position() == 0 && !endStream && !aborted) {
85 try {
86 condition.await();
87 } catch (final InterruptedException ex) {
88 Thread.currentThread().interrupt();
89 throw new InterruptedIOException(ex.getMessage());
90 }
91 }
92 setOutputMode();
93 }
94 }
95
96 @Override
97 public int read() throws IOException {
98 lock.lock();
99 try {
100 setOutputMode();
101 awaitInput();
102 if (aborted) {
103 return -1;
104 }
105 if (!buffer().hasRemaining() && endStream) {
106 return -1;
107 }
108 final int b = buffer().get() & 0xff;
109 if (!buffer().hasRemaining() && capacityChannel != null) {
110 setInputMode();
111 if (buffer().hasRemaining()) {
112 capacityChannel.update(buffer().remaining());
113 }
114 }
115 return b;
116 } finally {
117 lock.unlock();
118 }
119 }
120
121 @Override
122 public int read(final byte[] b, final int off, final int len) throws IOException {
123 lock.lock();
124 try {
125 setOutputMode();
126 awaitInput();
127 if (aborted) {
128 return -1;
129 }
130 if (!buffer().hasRemaining() && endStream) {
131 return -1;
132 }
133 final int chunk = Math.min(buffer().remaining(), len);
134 buffer().get(b, off, chunk);
135 if (!buffer().hasRemaining() && capacityChannel != null) {
136 setInputMode();
137 if (buffer().hasRemaining()) {
138 capacityChannel.update(buffer().remaining());
139 }
140 }
141 return chunk;
142 } finally {
143 lock.unlock();
144 }
145 }
146
147 public void markEndStream() {
148 if (endStream) {
149 return;
150 }
151 lock.lock();
152 try {
153 if (!endStream) {
154 endStream = true;
155 capacityChannel = null;
156 condition.signalAll();
157 }
158 } finally {
159 lock.unlock();
160 }
161 }
162
163 }