1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HeaderElements;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpHeaders;
39 import org.apache.hc.core5.http.HttpRequest;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.HttpStatus;
42 import org.apache.hc.core5.http.HttpVersion;
43 import org.apache.hc.core5.http.ProtocolException;
44 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
45 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
46 import org.apache.hc.core5.http.impl.nio.MessageState;
47 import org.apache.hc.core5.http.message.StatusLine;
48 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
50 import org.apache.hc.core5.http.nio.DataStreamChannel;
51 import org.apache.hc.core5.http.nio.HandlerFactory;
52 import org.apache.hc.core5.http.nio.RequestChannel;
53 import org.apache.hc.core5.http.protocol.HttpContext;
54 import org.apache.hc.core5.http.protocol.HttpCoreContext;
55 import org.apache.hc.core5.http.protocol.HttpProcessor;
56 import org.apache.hc.core5.http2.H2ConnectionException;
57 import org.apache.hc.core5.http2.H2Error;
58 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
59 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
60
61 class ClientH2StreamHandler implements H2StreamHandler {
62
63 private final H2StreamChannel outputChannel;
64 private final DataStreamChannel dataChannel;
65 private final HttpProcessor httpProcessor;
66 private final BasicHttpConnectionMetrics connMetrics;
67 private final AsyncClientExchangeHandler exchangeHandler;
68 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
69 private final HttpCoreContext context;
70 private final AtomicBoolean requestCommitted;
71 private final AtomicBoolean failed;
72 private final AtomicBoolean done;
73
74 private volatile MessageState requestState;
75 private volatile MessageState responseState;
76
77 ClientH2StreamHandler(
78 final H2StreamChannel outputChannel,
79 final HttpProcessor httpProcessor,
80 final BasicHttpConnectionMetrics connMetrics,
81 final AsyncClientExchangeHandler exchangeHandler,
82 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
83 final HttpCoreContext context) {
84 this.outputChannel = outputChannel;
85 this.dataChannel = new DataStreamChannel() {
86
87 @Override
88 public void requestOutput() {
89 outputChannel.requestOutput();
90 }
91
92 @Override
93 public int write(final ByteBuffer src) throws IOException {
94 return outputChannel.write(src);
95 }
96
97 @Override
98 public void endStream(final List<? extends Header> trailers) throws IOException {
99 outputChannel.endStream(trailers);
100 requestState = MessageState.COMPLETE;
101 }
102
103 @Override
104 public void endStream() throws IOException {
105 outputChannel.endStream();
106 requestState = MessageState.COMPLETE;
107 }
108
109 };
110 this.httpProcessor = httpProcessor;
111 this.connMetrics = connMetrics;
112 this.exchangeHandler = exchangeHandler;
113 this.pushHandlerFactory = pushHandlerFactory;
114 this.context = context;
115 this.requestCommitted = new AtomicBoolean(false);
116 this.failed = new AtomicBoolean(false);
117 this.done = new AtomicBoolean(false);
118 this.requestState = MessageState.HEADERS;
119 this.responseState = MessageState.HEADERS;
120 }
121
122 @Override
123 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
124 return pushHandlerFactory;
125 }
126
127 @Override
128 public boolean isOutputReady() {
129 switch (requestState) {
130 case HEADERS:
131 return true;
132 case BODY:
133 return exchangeHandler.available() > 0;
134 default:
135 return false;
136 }
137 }
138
139 private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
140 if (requestCommitted.compareAndSet(false, true)) {
141 context.setProtocolVersion(HttpVersion.HTTP_2);
142 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
143
144 httpProcessor.process(request, entityDetails, context);
145
146 final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
147 outputChannel.submit(headers, entityDetails == null);
148 connMetrics.incrementRequestCount();
149
150 if (entityDetails == null) {
151 requestState = MessageState.COMPLETE;
152 } else {
153 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
154 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
155 if (expectContinue) {
156 requestState = MessageState.ACK;
157 } else {
158 requestState = MessageState.BODY;
159 exchangeHandler.produce(dataChannel);
160 }
161 }
162 } else {
163 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Request already committed");
164 }
165 }
166
167 @Override
168 public void produceOutput() throws HttpException, IOException {
169 switch (requestState) {
170 case HEADERS:
171 exchangeHandler.produceRequest(new RequestChannel() {
172
173 @Override
174 public void sendRequest(
175 final HttpRequest request, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
176 commitRequest(request, entityDetails);
177 }
178
179 }, context);
180 break;
181 case BODY:
182 exchangeHandler.produce(dataChannel);
183 break;
184 }
185 }
186
187 @Override
188 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
189 throw new ProtocolException("Unexpected message promise");
190 }
191
192 @Override
193 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
194 if (done.get()) {
195 throw new ProtocolException("Unexpected message headers");
196 }
197 switch (responseState) {
198 case HEADERS:
199 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
200 final int status = response.getCode();
201 if (status < HttpStatus.SC_INFORMATIONAL) {
202 throw new ProtocolException("Invalid response: " + new StatusLine(response));
203 }
204 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
205 exchangeHandler.consumeInformation(response, context);
206 }
207 if (requestState == MessageState.ACK) {
208 if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
209 requestState = MessageState.BODY;
210 exchangeHandler.produce(dataChannel);
211 }
212 }
213 if (status < HttpStatus.SC_SUCCESS) {
214 return;
215 }
216
217 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(response, -1);
218 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
219 httpProcessor.process(response, entityDetails, context);
220 connMetrics.incrementResponseCount();
221
222 exchangeHandler.consumeResponse(response, entityDetails, context);
223 responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
224 break;
225 case BODY:
226 responseState = MessageState.COMPLETE;
227 exchangeHandler.streamEnd(headers);
228 break;
229 default:
230 throw new ProtocolException("Unexpected message headers");
231 }
232 }
233
234 @Override
235 public void updateInputCapacity() throws IOException {
236 exchangeHandler.updateCapacity(outputChannel);
237 }
238
239 @Override
240 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
241 if (done.get() || responseState != MessageState.BODY) {
242 throw new ProtocolException("Unexpected message data");
243 }
244 if (src != null) {
245 exchangeHandler.consume(src);
246 }
247 if (endStream) {
248 responseState = MessageState.COMPLETE;
249 exchangeHandler.streamEnd(null);
250 }
251 }
252
253 @Override
254 public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
255 throw ex;
256 }
257
258 @Override
259 public void failed(final Exception cause) {
260 try {
261 if (failed.compareAndSet(false, true)) {
262 if (exchangeHandler != null) {
263 exchangeHandler.failed(cause);
264 }
265 }
266 } finally {
267 releaseResources();
268 }
269 }
270
271 @Override
272 public void releaseResources() {
273 if (done.compareAndSet(false, true)) {
274 responseState = MessageState.COMPLETE;
275 requestState = MessageState.COMPLETE;
276 exchangeHandler.releaseResources();
277 }
278 }
279
280 @Override
281 public String toString() {
282 return "[" +
283 "requestState=" + requestState +
284 ", responseState=" + responseState +
285 ']';
286 }
287
288 }
289