1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.hc.core5.annotation.Contract;
35 import org.apache.hc.core5.annotation.ThreadingBehavior;
36 import org.apache.hc.core5.http.nio.DataStreamChannel;
37
38
39
40
41 @Contract(threading = ThreadingBehavior.SAFE)
42 public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
43
44 private volatile DataStreamChannel dataStreamChannel;
45 private volatile boolean hasCapacity;
46 private volatile boolean endStreamPropagated;
47
48 public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
49 super(lock, initialBufferSize);
50 this.hasCapacity = false;
51 this.endStreamPropagated = false;
52 }
53
54 public SharedOutputBuffer(final int bufferSize) {
55 this(new ReentrantLock(), bufferSize);
56 }
57
58 public void flush(final DataStreamChannel channel) throws IOException {
59 lock.lock();
60 try {
61 dataStreamChannel = channel;
62 hasCapacity = true;
63 setOutputMode();
64 if (buffer().hasRemaining()) {
65 dataStreamChannel.write(buffer());
66 }
67 if (!buffer().hasRemaining() && endStream) {
68 propagateEndStream();
69 }
70 condition.signalAll();
71 } finally {
72 lock.unlock();
73 }
74 }
75
76 private void ensureNotAborted() throws InterruptedIOException {
77 if (aborted) {
78 throw new InterruptedIOException("Operation aborted");
79 }
80 }
81
82 @Override
83 public void write(final byte[] b, final int off, final int len) throws IOException {
84 final ByteBuffer src = ByteBuffer.wrap(b, off, len);
85 lock.lock();
86 try {
87 ensureNotAborted();
88 setInputMode();
89 while (src.hasRemaining()) {
90
91 if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
92 buffer().put(src);
93 } else {
94 if (buffer().position() > 0 || dataStreamChannel == null) {
95 waitFlush();
96 }
97 if (buffer().position() == 0 && dataStreamChannel != null) {
98 final int bytesWritten = dataStreamChannel.write(src);
99 if (bytesWritten == 0) {
100 hasCapacity = false;
101 waitFlush();
102 }
103 }
104 }
105 }
106 } finally {
107 lock.unlock();
108 }
109 }
110
111 @Override
112 public void write(final int b) throws IOException {
113 lock.lock();
114 try {
115 ensureNotAborted();
116 setInputMode();
117 if (!buffer().hasRemaining()) {
118 waitFlush();
119 }
120 buffer().put((byte)b);
121 } finally {
122 lock.unlock();
123 }
124 }
125
126 @Override
127 public void writeCompleted() throws IOException {
128 if (endStream) {
129 return;
130 }
131 lock.lock();
132 try {
133 if (!endStream) {
134 endStream = true;
135 if (dataStreamChannel != null) {
136 setOutputMode();
137 if (buffer().hasRemaining()) {
138 dataStreamChannel.requestOutput();
139 } else {
140 propagateEndStream();
141 }
142 }
143 }
144 } finally {
145 lock.unlock();
146 }
147 }
148
149 private void waitFlush() throws InterruptedIOException {
150 setOutputMode();
151 if (dataStreamChannel != null) {
152 dataStreamChannel.requestOutput();
153 }
154 ensureNotAborted();
155 while (buffer().hasRemaining() || !hasCapacity) {
156 try {
157 condition.await();
158 } catch (final InterruptedException ex) {
159 Thread.currentThread().interrupt();
160 throw new InterruptedIOException(ex.getMessage());
161 }
162 ensureNotAborted();
163 }
164 setInputMode();
165 }
166
167 private void propagateEndStream() throws IOException {
168 if (!endStreamPropagated) {
169 dataStreamChannel.endStream();
170 endStreamPropagated = true;
171 }
172 }
173
174 }