1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.entity;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.Set;
32
33 import org.apache.hc.core5.annotation.Contract;
34 import org.apache.hc.core5.annotation.ThreadingBehavior;
35 import org.apache.hc.core5.http.ContentType;
36 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
37 import org.apache.hc.core5.http.nio.DataStreamChannel;
38 import org.apache.hc.core5.http.nio.StreamChannel;
39 import org.apache.hc.core5.util.Args;
40
41
42
43
44
45
46 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
47 public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
48
49 enum State { ACTIVE, FLUSHING, END_STREAM }
50
51 private final int fragmentSizeHint;
52 private final ByteBuffer bytebuf;
53 private final ContentType contentType;
54
55 private volatile State state;
56
57 public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentType contentType) {
58 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
59 this.bytebuf = ByteBuffer.allocate(this.fragmentSizeHint);
60 this.contentType = contentType;
61 this.state = State.ACTIVE;
62 }
63
64 private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
65 if (bytebuf.position() > 0) {
66 bytebuf.flip();
67 channel.write(bytebuf);
68 bytebuf.compact();
69 }
70 }
71
72 final int writeData(final StreamChannel<ByteBuffer> channel, final ByteBuffer src) throws IOException {
73 final int chunk = src.remaining();
74 if (chunk == 0) {
75 return 0;
76 }
77 if (chunk > fragmentSizeHint) {
78
79
80
81
82 flush(channel);
83 if (bytebuf.position() == 0) {
84 return channel.write(src);
85 }
86 } else {
87
88
89
90
91 if (bytebuf.remaining() < chunk) {
92 flush(channel);
93 }
94 if (bytebuf.remaining() >= chunk) {
95 bytebuf.put(src);
96 if (!bytebuf.hasRemaining()) {
97 flush(channel);
98 }
99 return chunk;
100 }
101 }
102 return 0;
103 }
104
105 final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
106 if (state == State.ACTIVE) {
107 state = State.FLUSHING;
108 flush(channel);
109 if (bytebuf.position() == 0) {
110 state = State.END_STREAM;
111 channel.endStream();
112 }
113 }
114 }
115
116
117
118
119
120
121
122
123 protected abstract int availableData();
124
125
126
127
128
129
130
131
132
133
134 protected abstract void produceData(StreamChannel<ByteBuffer> channel) throws IOException;
135
136 @Override
137 public final String getContentType() {
138 return contentType != null ? contentType.toString() : null;
139 }
140
141 @Override
142 public String getContentEncoding() {
143 return null;
144 }
145
146 @Override
147 public boolean isChunked() {
148 return false;
149 }
150
151 @Override
152 public Set<String> getTrailerNames() {
153 return null;
154 }
155
156 @Override
157 public long getContentLength() {
158 return -1;
159 }
160
161 @Override
162 public final int available() {
163 if (state == State.ACTIVE) {
164 return availableData();
165 } else {
166 synchronized (bytebuf) {
167 return bytebuf.position();
168 }
169 }
170 }
171
172 @Override
173 public final void produce(final DataStreamChannel channel) throws IOException {
174 synchronized (bytebuf) {
175 if (state == State.ACTIVE) {
176 produceData(new StreamChannel<ByteBuffer>() {
177
178 @Override
179 public int write(final ByteBuffer src) throws IOException {
180 Args.notNull(src, "Buffer");
181 synchronized (bytebuf) {
182 return writeData(channel, src);
183 }
184 }
185
186 @Override
187 public void endStream() throws IOException {
188 synchronized (bytebuf) {
189 streamEnd(channel);
190 }
191 }
192
193 });
194 }
195 if (state == State.FLUSHING) {
196 flush(channel);
197 if (bytebuf.position() == 0) {
198 state = State.END_STREAM;
199 channel.endStream();
200 }
201 }
202 }
203 }
204
205 @Override
206 public void releaseResources() {
207 state = State.ACTIVE;
208 }
209
210 }