1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.CharBuffer;
32 import java.nio.charset.Charset;
33 import java.nio.charset.CharsetEncoder;
34 import java.nio.charset.CoderResult;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Set;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.http.ContentType;
41 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
42 import org.apache.hc.core5.http.nio.DataStreamChannel;
43 import org.apache.hc.core5.http.nio.StreamChannel;
44 import org.apache.hc.core5.util.Args;
45
46
47
48
49
50
51 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
52 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
53
54 private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
55
56 enum State { ACTIVE, FLUSHING, END_STREAM }
57
58 private final ByteBuffer bytebuf;
59 private final int fragmentSizeHint;
60 private final ContentType contentType;
61 private final CharsetEncoder charsetEncoder;
62
63 private volatile State state;
64
65 public AbstractCharAsyncEntityProducer(
66 final int bufferSize,
67 final int fragmentSizeHint,
68 final ContentType contentType) {
69 Args.positive(bufferSize, "Buffer size");
70 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
71 this.bytebuf = ByteBuffer.allocate(bufferSize);
72 this.contentType = contentType;
73 Charset charset = contentType != null ? contentType.getCharset() : null;
74 if (charset == null) {
75 charset = StandardCharsets.US_ASCII;
76 }
77 this.charsetEncoder = charset.newEncoder();
78 this.state = State.ACTIVE;
79 }
80
81 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
82 if (bytebuf.position() > 0) {
83 bytebuf.flip();
84 channel.write(bytebuf);
85 bytebuf.compact();
86 }
87 }
88
89 final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
90
91 final int chunk = src.remaining();
92 if (chunk == 0) {
93 return 0;
94 }
95
96 final int p = src.position();
97 final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
98 if (result.isError()) {
99 result.throwException();
100 }
101
102 if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
103 flush(channel);
104 }
105
106 return src.position() - p;
107 }
108
109 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
110 if (state == State.ACTIVE) {
111 state = State.FLUSHING;
112 if (!bytebuf.hasRemaining()) {
113 flush(channel);
114 }
115
116 final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
117 if (result.isError()) {
118 result.throwException();
119 }
120 final CoderResult result2 = charsetEncoder.flush(bytebuf);
121 if (result2.isError()) {
122 result.throwException();
123 } else if (result.isUnderflow()) {
124 flush(channel);
125 if (bytebuf.position() == 0) {
126 state = State.END_STREAM;
127 channel.endStream();
128 }
129 }
130 }
131
132 }
133
134
135
136
137
138
139
140
141 protected abstract int availableData();
142
143
144
145
146
147
148
149
150
151
152 protected abstract void produceData(StreamChannel<CharBuffer> channel) throws IOException;
153
154 @Override
155 public final String getContentType() {
156 return contentType != null ? contentType.toString() : null;
157 }
158
159 @Override
160 public String getContentEncoding() {
161 return null;
162 }
163
164 @Override
165 public long getContentLength() {
166 return -1;
167 }
168
169 @Override
170 public boolean isChunked() {
171 return false;
172 }
173
174 @Override
175 public Set<String> getTrailerNames() {
176 return null;
177 }
178
179 @Override
180 public final int available() {
181 if (state == State.ACTIVE) {
182 return availableData();
183 } else {
184 synchronized (bytebuf) {
185 return bytebuf.position();
186 }
187 }
188 }
189
190 @Override
191 public final void produce(final DataStreamChannel channel) throws IOException {
192 synchronized (bytebuf) {
193 if (state == State.ACTIVE) {
194 produceData(new StreamChannel<CharBuffer>() {
195
196 @Override
197 public int write(final CharBuffer src) throws IOException {
198 Args.notNull(src, "Buffer");
199 synchronized (bytebuf) {
200 return writeData(channel, src);
201 }
202 }
203
204 @Override
205 public void endStream() throws IOException {
206 synchronized (bytebuf) {
207 streamEnd(channel);
208 }
209 }
210
211 });
212 }
213 if (state == State.FLUSHING) {
214 final CoderResult result = charsetEncoder.flush(bytebuf);
215 if (result.isError()) {
216 result.throwException();
217 } else if (result.isOverflow()) {
218 flush(channel);
219 } else if (result.isUnderflow()) {
220 flush(channel);
221 if (bytebuf.position() == 0) {
222 state = State.END_STREAM;
223 channel.endStream();
224 }
225 }
226
227 }
228
229 }
230 }
231
232 @Override
233 public void releaseResources() {
234 state = State.ACTIVE;
235 charsetEncoder.reset();
236 }
237
238 }