1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.support;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.function.Callback;
36 import org.apache.hc.core5.http.EntityDetails;
37 import org.apache.hc.core5.http.Header;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpRequest;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
42 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
43 import org.apache.hc.core5.http.nio.AsyncFilterChain;
44 import org.apache.hc.core5.http.nio.AsyncPushProducer;
45 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
46 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
47 import org.apache.hc.core5.http.nio.CapacityChannel;
48 import org.apache.hc.core5.http.nio.DataStreamChannel;
49 import org.apache.hc.core5.http.nio.HandlerFactory;
50 import org.apache.hc.core5.http.nio.ResponseChannel;
51 import org.apache.hc.core5.http.protocol.HttpContext;
52 import org.apache.hc.core5.util.Args;
53 import org.apache.hc.core5.util.Asserts;
54
55
56
57
58
59
60
61 public final class AsyncServerFilterChainExchangeHandlerFactory implements HandlerFactory<AsyncServerExchangeHandler> {
62
63 private final AsyncServerFilterChainElement filterChain;
64 private final Callback<Exception> exceptionCallback;
65
66 public AsyncServerFilterChainExchangeHandlerFactory(final AsyncServerFilterChainElement filterChain,
67 final Callback<Exception> exceptionCallback) {
68 this.filterChain = Args.notNull(filterChain, "Filter chain");
69 this.exceptionCallback = exceptionCallback;
70 }
71
72 public AsyncServerFilterChainExchangeHandlerFactory(final AsyncServerFilterChainElement filterChain) {
73 this(filterChain, null);
74 }
75
76 @Override
77 public AsyncServerExchangeHandler create(final HttpRequest request, final HttpContext context) throws HttpException {
78 return new AsyncServerExchangeHandler() {
79
80 private final AtomicReference<AsyncDataConsumer> dataConsumerRef = new AtomicReference<>();
81 private final AtomicReference<AsyncResponseProducer> responseProducerRef = new AtomicReference<>();
82
83 @Override
84 public void handleRequest(
85 final HttpRequest request,
86 final EntityDetails entityDetails,
87 final ResponseChannel responseChannel,
88 final HttpContext context) throws HttpException, IOException {
89 dataConsumerRef.set(filterChain.handle(request, entityDetails, context, new AsyncFilterChain.ResponseTrigger() {
90
91 @Override
92 public void sendInformation(
93 final HttpResponse response) throws HttpException, IOException {
94 responseChannel.sendInformation(response, context);
95 }
96
97 @Override
98 public void submitResponse(
99 final HttpResponse response,
100 final AsyncEntityProducer entityProducer) throws HttpException, IOException {
101 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, entityProducer);
102 responseProducerRef.set(responseProducer);
103 responseProducer.sendResponse(responseChannel, context);
104 }
105
106 @Override
107 public void pushPromise(final HttpRequest promise, final AsyncPushProducer responseProducer) throws HttpException, IOException {
108 responseChannel.pushPromise(promise, responseProducer, context);
109 }
110
111 }));
112 }
113
114 @Override
115 public void failed(final Exception cause) {
116 if (exceptionCallback != null) {
117 exceptionCallback.execute(cause);
118 }
119 final AsyncResponseProducer handler = responseProducerRef.get();
120 if (handler != null) {
121 handler.failed(cause);
122 }
123 }
124
125 @Override
126 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
127 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
128 if (dataConsumer != null) {
129 dataConsumer.updateCapacity(capacityChannel);
130 } else {
131 capacityChannel.update(Integer.MAX_VALUE);
132 }
133 }
134
135 @Override
136 public void consume(final ByteBuffer src) throws IOException {
137 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
138 if (dataConsumer != null) {
139 dataConsumer.consume(src);
140 }
141 }
142
143 @Override
144 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
145 final AsyncDataConsumer dataConsumer = dataConsumerRef.get();
146 if (dataConsumer != null) {
147 dataConsumer.streamEnd(trailers);
148 }
149 }
150
151 @Override
152 public int available() {
153 final AsyncResponseProducer responseProducer = responseProducerRef.get();
154 Asserts.notNull(responseProducer, "Response producer");
155 return responseProducer.available();
156 }
157
158 @Override
159 public void produce(final DataStreamChannel channel) throws IOException {
160 final AsyncResponseProducer responseProducer = responseProducerRef.get();
161 Asserts.notNull(responseProducer, "Response producer");
162 responseProducer.produce(channel);
163 }
164
165 @Override
166 public void releaseResources() {
167 final AsyncDataConsumer dataConsumer = dataConsumerRef.getAndSet(null);
168 if (dataConsumer != null) {
169 dataConsumer.releaseResources();
170 }
171 final AsyncResponseProducer responseProducer = responseProducerRef.getAndSet(null);
172 if (responseProducer != null) {
173 responseProducer.releaseResources();
174 }
175 }
176 };
177 }
178
179 }