1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpStatus;
40 import org.apache.hc.core5.http.HttpVersion;
41 import org.apache.hc.core5.http.ProtocolException;
42 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
43 import org.apache.hc.core5.http.impl.nio.MessageState;
44 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45 import org.apache.hc.core5.http.nio.AsyncPushProducer;
46 import org.apache.hc.core5.http.nio.DataStreamChannel;
47 import org.apache.hc.core5.http.nio.HandlerFactory;
48 import org.apache.hc.core5.http.nio.ResponseChannel;
49 import org.apache.hc.core5.http.protocol.HttpContext;
50 import org.apache.hc.core5.http.protocol.HttpCoreContext;
51 import org.apache.hc.core5.http.protocol.HttpProcessor;
52 import org.apache.hc.core5.http2.H2ConnectionException;
53 import org.apache.hc.core5.http2.H2Error;
54 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
55 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
56
57 class ServerPushH2StreamHandler implements H2StreamHandler {
58
59 private final H2StreamChannel outputChannel;
60 private final DataStreamChannel dataChannel;
61 private final HttpProcessor httpProcessor;
62 private final BasicHttpConnectionMetrics connMetrics;
63 private final AsyncPushProducer pushProducer;
64 private final HttpCoreContext context;
65 private final AtomicBoolean responseCommitted;
66 private final AtomicBoolean failed;
67 private final AtomicBoolean done;
68
69 private volatile MessageState requestState;
70 private volatile MessageState responseState;
71
72 ServerPushH2StreamHandler(
73 final H2StreamChannel outputChannel,
74 final HttpProcessor httpProcessor,
75 final BasicHttpConnectionMetrics connMetrics,
76 final AsyncPushProducer pushProducer,
77 final HttpCoreContext context) {
78 this.outputChannel = outputChannel;
79 this.dataChannel = new DataStreamChannel() {
80
81 @Override
82 public void requestOutput() {
83 outputChannel.requestOutput();
84 }
85
86 @Override
87 public int write(final ByteBuffer src) throws IOException {
88 return outputChannel.write(src);
89 }
90
91 @Override
92 public void endStream(final List<? extends Header> trailers) throws IOException {
93 outputChannel.endStream(trailers);
94 responseState = MessageState.COMPLETE;
95 }
96
97 @Override
98 public void endStream() throws IOException {
99 outputChannel.endStream();
100 responseState = MessageState.COMPLETE;
101 }
102
103 };
104 this.httpProcessor = httpProcessor;
105 this.connMetrics = connMetrics;
106 this.pushProducer = pushProducer;
107 this.context = context;
108 this.responseCommitted = new AtomicBoolean(false);
109 this.failed = new AtomicBoolean(false);
110 this.done = new AtomicBoolean(false);
111 this.requestState = MessageState.COMPLETE;
112 this.responseState = MessageState.IDLE;
113 }
114
115 @Override
116 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
117 return null;
118 }
119
120 @Override
121 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
122 throw new ProtocolException("Unexpected message promise");
123 }
124
125 @Override
126 public void consumeHeader(final List<Header> requestHeaders, final boolean requestEndStream) throws HttpException, IOException {
127 throw new ProtocolException("Unexpected message headers");
128 }
129
130 @Override
131 public void updateInputCapacity() throws IOException {
132 }
133
134 @Override
135 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
136 throw new ProtocolException("Unexpected message data");
137 }
138
139 @Override
140 public boolean isOutputReady() {
141 switch (responseState) {
142 case IDLE:
143 return true;
144 case BODY:
145 return pushProducer.available() > 0;
146 default:
147 return false;
148 }
149 }
150
151 private void commitInformation(final HttpResponse response) throws IOException, HttpException {
152 if (responseCommitted.get()) {
153 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
154 }
155 final int status = response.getCode();
156 if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
157 throw new HttpException("Invalid intermediate response: " + status);
158 }
159 final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
160 outputChannel.submit(responseHeaders, false);
161 }
162
163 private void commitResponse(
164 final HttpResponse response,
165 final EntityDetails responseEntityDetails) throws HttpException, IOException {
166 if (responseCommitted.compareAndSet(false, true)) {
167
168 context.setProtocolVersion(HttpVersion.HTTP_2);
169 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
170 httpProcessor.process(response, responseEntityDetails, context);
171
172 final List<Header> headers = DefaultH2ResponseConverter.INSTANCE.convert(response);
173 outputChannel.submit(headers, responseEntityDetails == null);
174 connMetrics.incrementResponseCount();
175 if (responseEntityDetails == null) {
176 responseState = MessageState.COMPLETE;
177 } else {
178 responseState = MessageState.BODY;
179 pushProducer.produce(outputChannel);
180 }
181 }
182 }
183
184 private void commitPromise(
185 final HttpRequest promise,
186 final AsyncPushProducer pushProducer) throws HttpException, IOException {
187
188 context.setProtocolVersion(HttpVersion.HTTP_2);
189 context.setAttribute(HttpCoreContext.HTTP_REQUEST, promise);
190 httpProcessor.process(promise, null, context);
191
192 final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(promise);
193
194 outputChannel.push(headers, pushProducer);
195 connMetrics.incrementRequestCount();
196 }
197
198 @Override
199 public void produceOutput() throws HttpException, IOException {
200 switch (responseState) {
201 case IDLE:
202 responseState = MessageState.HEADERS;
203 pushProducer.produceResponse(new ResponseChannel() {
204
205 @Override
206 public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
207 commitInformation(response);
208 }
209
210 @Override
211 public void sendResponse(
212 final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
213 commitResponse(response, entityDetails);
214 }
215
216 @Override
217 public void pushPromise(
218 final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
219 commitPromise(promise, pushProducer);
220 }
221
222 }, context);
223 break;
224 case BODY:
225 pushProducer.produce(dataChannel);
226 break;
227 }
228 }
229
230 @Override
231 public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
232 throw ex;
233 }
234
235 @Override
236 public void failed(final Exception cause) {
237 try {
238 if (failed.compareAndSet(false, true)) {
239 pushProducer.failed(cause);
240 }
241 } finally {
242 releaseResources();
243 }
244 }
245
246 @Override
247 public void releaseResources() {
248 if (done.compareAndSet(false, true)) {
249 requestState = MessageState.COMPLETE;
250 responseState = MessageState.COMPLETE;
251 pushProducer.releaseResources();
252 }
253 }
254
255 @Override
256 public String toString() {
257 return "[" +
258 "requestState=" + requestState +
259 ", responseState=" + responseState +
260 ']';
261 }
262
263 }
264