1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.concurrent.FutureCallback;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpRequest;
39 import org.apache.hc.core5.http.HttpResponse;
40 import org.apache.hc.core5.http.HttpStatus;
41 import org.apache.hc.core5.http.nio.AsyncPushProducer;
42 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
43 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
44 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
45 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
46 import org.apache.hc.core5.http.nio.CapacityChannel;
47 import org.apache.hc.core5.http.nio.DataStreamChannel;
48 import org.apache.hc.core5.http.nio.ResponseChannel;
49 import org.apache.hc.core5.http.protocol.HttpContext;
50 import org.apache.hc.core5.util.Asserts;
51
52
53
54
55
56
57 public abstract class AbstractServerExchangeHandler<T> implements AsyncServerExchangeHandler {
58
59 private final AtomicReference<AsyncRequestConsumer<T>> requestConsumerRef;
60 private final AtomicReference<AsyncResponseProducer> responseProducerRef;
61
62 public AbstractServerExchangeHandler() {
63 this.requestConsumerRef = new AtomicReference<>(null);
64 this.responseProducerRef = new AtomicReference<>(null);
65 }
66
67
68
69
70
71
72
73
74 protected abstract AsyncRequestConsumer<T> supplyConsumer(
75 HttpRequest request,
76 EntityDetails entityDetails,
77 HttpContext context) throws HttpException;
78
79
80
81
82
83
84
85
86
87
88
89 protected abstract void handle(
90 T requestMessage,
91 AsyncServerRequestHandler.ResponseTrigger responseTrigger,
92 HttpContext context) throws HttpException, IOException;
93
94 @Override
95 public final void handleRequest(
96 final HttpRequest request,
97 final EntityDetails entityDetails,
98 final ResponseChannel responseChannel,
99 final HttpContext context) throws HttpException, IOException {
100
101 final AsyncRequestConsumer<T> requestConsumer = supplyConsumer(request, entityDetails, context);
102 if (requestConsumer == null) {
103 throw new HttpException("Unable to handle request");
104 }
105 requestConsumerRef.set(requestConsumer);
106 final AsyncServerRequestHandler.ResponseTrigger responseTrigger = new AsyncServerRequestHandler.ResponseTrigger() {
107
108 @Override
109 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
110 responseChannel.sendInformation(response, httpContext);
111 }
112
113 @Override
114 public void submitResponse(
115 final AsyncResponseProducer producer, final HttpContext httpContext) throws HttpException, IOException {
116 if (responseProducerRef.compareAndSet(null, producer)) {
117 producer.sendResponse(responseChannel, httpContext);
118 }
119 }
120
121 @Override
122 public void pushPromise(
123 final HttpRequest promise, final HttpContext httpContext, final AsyncPushProducer pushProducer) throws HttpException, IOException {
124 responseChannel.pushPromise(promise, pushProducer, httpContext);
125 }
126
127 @Override
128 public String toString() {
129 return "Response trigger: " + responseChannel;
130 }
131
132 };
133 requestConsumer.consumeRequest(request, entityDetails, context, new FutureCallback<T>() {
134
135 @Override
136 public void completed(final T result) {
137 try {
138 handle(result, responseTrigger, context);
139 } catch (final HttpException ex) {
140 try {
141 responseTrigger.submitResponse(
142 AsyncResponseBuilder.create(HttpStatus.SC_INTERNAL_SERVER_ERROR)
143 .setEntity(ex.getMessage())
144 .build(),
145 context);
146 } catch (final HttpException | IOException ex2) {
147 failed(ex2);
148 }
149 } catch (final IOException ex) {
150 failed(ex);
151 }
152 }
153
154 @Override
155 public void failed(final Exception ex) {
156 AbstractServerExchangeHandler.this.failed(ex);
157 }
158
159 @Override
160 public void cancelled() {
161 releaseResources();
162 }
163
164 });
165
166 }
167
168 @Override
169 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
170 final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
171 Asserts.notNull(requestConsumer, "Data consumer");
172 requestConsumer.updateCapacity(capacityChannel);
173 }
174
175 @Override
176 public final void consume(final ByteBuffer src) throws IOException {
177 final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
178 Asserts.notNull(requestConsumer, "Data consumer");
179 requestConsumer.consume(src);
180 }
181
182 @Override
183 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
184 final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
185 Asserts.notNull(requestConsumer, "Data consumer");
186 requestConsumer.streamEnd(trailers);
187 }
188
189 @Override
190 public final int available() {
191 final AsyncResponseProducer dataProducer = responseProducerRef.get();
192 return dataProducer != null ? dataProducer.available() : 0;
193 }
194
195 @Override
196 public final void produce(final DataStreamChannel channel) throws IOException {
197 final AsyncResponseProducer dataProducer = responseProducerRef.get();
198 Asserts.notNull(dataProducer, "Data producer");
199 dataProducer.produce(channel);
200 }
201
202 @Override
203 public final void failed(final Exception cause) {
204 try {
205 final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
206 if (requestConsumer != null) {
207 requestConsumer.failed(cause);
208 }
209 final AsyncResponseProducer dataProducer = responseProducerRef.get();
210 if (dataProducer != null) {
211 dataProducer.failed(cause);
212 }
213 } finally {
214 releaseResources();
215 }
216 }
217
218 @Override
219 public final void releaseResources() {
220 final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.getAndSet(null);
221 if (requestConsumer != null) {
222 requestConsumer.releaseResources();
223 }
224 final AsyncResponseProducer dataProducer = responseProducerRef.getAndSet(null);
225 if (dataProducer != null) {
226 dataProducer.releaseResources();
227 }
228 }
229
230 }