1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.OutputStream;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.Locale;
35 import java.util.Set;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpHeaders;
44 import org.apache.hc.core5.http.HttpRequest;
45 import org.apache.hc.core5.http.HttpResponse;
46 import org.apache.hc.core5.http.HttpStatus;
47 import org.apache.hc.core5.http.ProtocolVersion;
48 import org.apache.hc.core5.http.message.BasicHttpResponse;
49 import org.apache.hc.core5.http.message.HttpResponseWrapper;
50 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51 import org.apache.hc.core5.http.nio.CapacityChannel;
52 import org.apache.hc.core5.http.nio.DataStreamChannel;
53 import org.apache.hc.core5.http.nio.ResponseChannel;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.io.Closer;
56 import org.apache.hc.core5.util.Args;
57 import org.apache.hc.core5.util.Asserts;
58
59
60
61
62
63
64
65
66 public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
67
68 private enum State { IDLE, ACTIVE, COMPLETED }
69
70 private final int initialBufferSize;
71 private final Executor executor;
72 private final AtomicReference<State> state;
73 private final AtomicReference<Exception> exception;
74
75 private volatile SharedInputBuffer inputBuffer;
76 private volatile SharedOutputBuffer outputBuffer;
77
78 public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
79 this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
80 this.executor = Args.notNull(executor, "Executor");
81 this.exception = new AtomicReference<>();
82 this.state = new AtomicReference<>(State.IDLE);
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96 protected abstract void handle(
97 HttpRequest request, InputStream requestStream,
98 HttpResponse response, OutputStream responseStream,
99 HttpContext context) throws IOException, HttpException;
100
101 public Exception getException() {
102 return exception.get();
103 }
104
105 @Override
106 public final void handleRequest(
107 final HttpRequest request,
108 final EntityDetails entityDetails,
109 final ResponseChannel responseChannel,
110 final HttpContext context) throws HttpException, IOException {
111 final AtomicBoolean responseCommitted = new AtomicBoolean(false);
112
113 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
114 final HttpResponse responseWrapper = new HttpResponseWrapper(response){
115
116 private void ensureNotCommitted() {
117 Asserts.check(!responseCommitted.get(), "Response already committed");
118 }
119
120 @Override
121 public void addHeader(final String name, final Object value) {
122 ensureNotCommitted();
123 super.addHeader(name, value);
124 }
125
126 @Override
127 public void setHeader(final String name, final Object value) {
128 ensureNotCommitted();
129 super.setHeader(name, value);
130 }
131
132 @Override
133 public void setVersion(final ProtocolVersion version) {
134 ensureNotCommitted();
135 super.setVersion(version);
136 }
137
138 @Override
139 public void setCode(final int code) {
140 ensureNotCommitted();
141 super.setCode(code);
142 }
143
144 @Override
145 public void setReasonPhrase(final String reason) {
146 ensureNotCommitted();
147 super.setReasonPhrase(reason);
148 }
149
150 @Override
151 public void setLocale(final Locale locale) {
152 ensureNotCommitted();
153 super.setLocale(locale);
154 }
155
156 };
157
158 final InputStream inputStream;
159 if (entityDetails != null) {
160 inputBuffer = new SharedInputBuffer(initialBufferSize);
161 inputStream = new ContentInputStream(inputBuffer);
162 } else {
163 inputStream = null;
164 }
165 outputBuffer = new SharedOutputBuffer(initialBufferSize);
166
167 final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
168
169 private void triggerResponse() throws IOException {
170 try {
171 if (responseCommitted.compareAndSet(false, true)) {
172 responseChannel.sendResponse(response, new EntityDetails() {
173
174 @Override
175 public long getContentLength() {
176 return -1;
177 }
178
179 @Override
180 public String getContentType() {
181 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
182 return h != null ? h.getValue() : null;
183 }
184
185 @Override
186 public String getContentEncoding() {
187 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
188 return h != null ? h.getValue() : null;
189 }
190
191 @Override
192 public boolean isChunked() {
193 return false;
194 }
195
196 @Override
197 public Set<String> getTrailerNames() {
198 return null;
199 }
200
201 }, context);
202 }
203 } catch (final HttpException ex) {
204 throw new IOException(ex.getMessage(), ex);
205 }
206 }
207
208 @Override
209 public void close() throws IOException {
210 triggerResponse();
211 super.close();
212 }
213
214 @Override
215 public void write(final byte[] b, final int off, final int len) throws IOException {
216 triggerResponse();
217 super.write(b, off, len);
218 }
219
220 @Override
221 public void write(final byte[] b) throws IOException {
222 triggerResponse();
223 super.write(b);
224 }
225
226 @Override
227 public void write(final int b) throws IOException {
228 triggerResponse();
229 super.write(b);
230 }
231
232 };
233
234 if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
235 executor.execute(new Runnable() {
236
237 @Override
238 public void run() {
239 try {
240 handle(request, inputStream, responseWrapper, outputStream, context);
241 Closer.close(inputStream);
242 outputStream.close();
243 } catch (final Exception ex) {
244 exception.compareAndSet(null, ex);
245 if (inputBuffer != null) {
246 inputBuffer.abort();
247 }
248 outputBuffer.abort();
249 } finally {
250 state.set(State.COMPLETED);
251 }
252 }
253
254 });
255 }
256 }
257
258 @Override
259 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
260 if (inputBuffer != null) {
261 inputBuffer.updateCapacity(capacityChannel);
262 }
263 }
264
265 @Override
266 public final void consume(final ByteBuffer src) throws IOException {
267 Asserts.notNull(inputBuffer, "Input buffer");
268 inputBuffer.fill(src);
269 }
270
271 @Override
272 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
273 Asserts.notNull(inputBuffer, "Input buffer");
274 inputBuffer.markEndStream();
275 }
276
277 @Override
278 public final int available() {
279 Asserts.notNull(outputBuffer, "Output buffer");
280 return outputBuffer.length();
281 }
282
283 @Override
284 public final void produce(final DataStreamChannel channel) throws IOException {
285 Asserts.notNull(outputBuffer, "Output buffer");
286 outputBuffer.flush(channel);
287 }
288
289 @Override
290 public final void failed(final Exception cause) {
291 exception.compareAndSet(null, cause);
292 releaseResources();
293 }
294
295 @Override
296 public void releaseResources() {
297 }
298
299 }