1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.apache.hc.core5.annotation.Contract;
36 import org.apache.hc.core5.annotation.ThreadingBehavior;
37 import org.apache.hc.core5.http.nio.CapacityChannel;
38
39
40
41
42 @Contract(threading = ThreadingBehavior.SAFE)
43 public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
44
45 private final int initialBufferSize;
46 private final AtomicInteger capacityIncrement;
47
48 private volatile CapacityChannel capacityChannel;
49
50 public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
51 super(lock, initialBufferSize);
52 this.initialBufferSize = initialBufferSize;
53 this.capacityIncrement = new AtomicInteger(0);
54 }
55
56 public SharedInputBuffer(final int bufferSize) {
57 this(new ReentrantLock(), bufferSize);
58 }
59
60 public int fill(final ByteBuffer src) {
61 lock.lock();
62 try {
63 setInputMode();
64 ensureAdjustedCapacity(buffer().position() + src.remaining());
65 buffer().put(src);
66 final int remaining = buffer().remaining();
67 condition.signalAll();
68 return remaining;
69 } finally {
70 lock.unlock();
71 }
72 }
73
74 private void incrementCapacity() throws IOException {
75 if (capacityChannel != null) {
76 final int increment = capacityIncrement.getAndSet(0);
77 if (increment > 0) {
78 capacityChannel.update(increment);
79 }
80 }
81 }
82
83 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
84 lock.lock();
85 try {
86 this.capacityChannel = capacityChannel;
87 setInputMode();
88 if (buffer().position() == 0) {
89 capacityChannel.update(initialBufferSize);
90 }
91 } finally {
92 lock.unlock();
93 }
94 }
95
96 private void awaitInput() throws InterruptedIOException {
97 if (!buffer().hasRemaining()) {
98 setInputMode();
99 while (buffer().position() == 0 && !endStream && !aborted) {
100 try {
101 condition.await();
102 } catch (final InterruptedException ex) {
103 Thread.currentThread().interrupt();
104 throw new InterruptedIOException(ex.getMessage());
105 }
106 }
107 setOutputMode();
108 }
109 }
110
111 @Override
112 public int read() throws IOException {
113 lock.lock();
114 try {
115 setOutputMode();
116 awaitInput();
117 if (aborted) {
118 return -1;
119 }
120 if (!buffer().hasRemaining() && endStream) {
121 return -1;
122 }
123 final int b = buffer().get() & 0xff;
124 capacityIncrement.incrementAndGet();
125 if (!buffer().hasRemaining()) {
126 incrementCapacity();
127 }
128 return b;
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 @Override
135 public int read(final byte[] b, final int off, final int len) throws IOException {
136 lock.lock();
137 try {
138 setOutputMode();
139 awaitInput();
140 if (aborted) {
141 return -1;
142 }
143 if (!buffer().hasRemaining() && endStream) {
144 return -1;
145 }
146 final int chunk = Math.min(buffer().remaining(), len);
147 buffer().get(b, off, chunk);
148 capacityIncrement.addAndGet(chunk);
149 if (!buffer().hasRemaining()) {
150 incrementCapacity();
151 }
152 return chunk;
153 } finally {
154 lock.unlock();
155 }
156 }
157
158 public void markEndStream() {
159 if (endStream) {
160 return;
161 }
162 lock.lock();
163 try {
164 if (!endStream) {
165 endStream = true;
166 capacityChannel = null;
167 condition.signalAll();
168 }
169 } finally {
170 lock.unlock();
171 }
172 }
173
174 }