1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpVersion;
40 import org.apache.hc.core5.http.ProtocolException;
41 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
42 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
43 import org.apache.hc.core5.http.impl.nio.MessageState;
44 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45 import org.apache.hc.core5.http.nio.HandlerFactory;
46 import org.apache.hc.core5.http.protocol.HttpCoreContext;
47 import org.apache.hc.core5.http.protocol.HttpProcessor;
48 import org.apache.hc.core5.http2.H2ConnectionException;
49 import org.apache.hc.core5.http2.H2Error;
50 import org.apache.hc.core5.http2.H2StreamResetException;
51 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
52 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
53 import org.apache.hc.core5.util.Asserts;
54
55 class ClientPushH2StreamHandler implements H2StreamHandler {
56
57 private final H2StreamChannel internalOutputChannel;
58 private final HttpProcessor httpProcessor;
59 private final BasicHttpConnectionMetrics connMetrics;
60 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61 private final HttpCoreContext context;
62 private final AtomicBoolean failed;
63 private final AtomicBoolean done;
64
65 private volatile HttpRequest request;
66 private volatile AsyncPushConsumer exchangeHandler;
67 private volatile MessageState requestState;
68 private volatile MessageState responseState;
69
70 ClientPushH2StreamHandler(
71 final H2StreamChannel outputChannel,
72 final HttpProcessor httpProcessor,
73 final BasicHttpConnectionMetrics connMetrics,
74 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
75 final HttpCoreContext context) {
76 this.internalOutputChannel = outputChannel;
77 this.httpProcessor = httpProcessor;
78 this.connMetrics = connMetrics;
79 this.pushHandlerFactory = pushHandlerFactory;
80 this.context = context;
81 this.failed = new AtomicBoolean(false);
82 this.done = new AtomicBoolean(false);
83 this.requestState = MessageState.HEADERS;
84 this.responseState = MessageState.HEADERS;
85 }
86
87 @Override
88 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
89 return pushHandlerFactory;
90 }
91
92 @Override
93 public boolean isOutputReady() {
94 return false;
95 }
96
97 @Override
98 public void produceOutput() throws HttpException, IOException {
99 }
100
101 @Override
102 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
103 if (requestState == MessageState.HEADERS) {
104
105 request = DefaultH2RequestConverter.INSTANCE.convert(headers);
106
107 final AsyncPushConsumer handler;
108 try {
109 handler = pushHandlerFactory != null ? pushHandlerFactory.create(request, context) : null;
110 } catch (final ProtocolException ex) {
111 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
112 }
113 if (handler == null) {
114 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
115 }
116 exchangeHandler = handler;
117
118 context.setProtocolVersion(HttpVersion.HTTP_2);
119 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
120
121 httpProcessor.process(request, null, context);
122 connMetrics.incrementRequestCount();
123 this.requestState = MessageState.COMPLETE;
124 } else {
125 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promise");
126 }
127 }
128
129 @Override
130 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
131 if (responseState == MessageState.HEADERS) {
132 Asserts.notNull(request, "Request");
133 Asserts.notNull(exchangeHandler, "Exchange handler");
134
135 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
136 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
137
138 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
139 httpProcessor.process(response, entityDetails, context);
140 connMetrics.incrementResponseCount();
141
142 exchangeHandler.consumePromise(request, response, entityDetails, context);
143 if (endStream) {
144 responseState = MessageState.COMPLETE;
145 exchangeHandler.streamEnd(null);
146 } else {
147 responseState = MessageState.BODY;
148 }
149 } else {
150 throw new ProtocolException("Unexpected message headers");
151 }
152 }
153
154 @Override
155 public void updateInputCapacity() throws IOException {
156 Asserts.notNull(exchangeHandler, "Exchange handler");
157 exchangeHandler.updateCapacity(internalOutputChannel);
158 }
159
160 @Override
161 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
162 if (responseState != MessageState.BODY) {
163 throw new ProtocolException("Unexpected message data");
164 }
165 Asserts.notNull(exchangeHandler, "Exchange handler");
166 if (src != null) {
167 exchangeHandler.consume(src);
168 }
169 if (endStream) {
170 responseState = MessageState.COMPLETE;
171 exchangeHandler.streamEnd(null);
172 }
173 }
174
175 public boolean isDone() {
176 return responseState == MessageState.COMPLETE;
177 }
178
179 @Override
180 public void failed(final Exception cause) {
181 try {
182 if (failed.compareAndSet(false, true)) {
183 if (exchangeHandler != null) {
184 exchangeHandler.failed(cause);
185 }
186 }
187 } finally {
188 releaseResources();
189 }
190 }
191
192 @Override
193 public void handle(final HttpException ex, final boolean endStream) throws HttpException {
194 throw ex;
195 }
196
197 @Override
198 public void releaseResources() {
199 if (done.compareAndSet(false, true)) {
200 responseState = MessageState.COMPLETE;
201 requestState = MessageState.COMPLETE;
202 if (exchangeHandler != null) {
203 exchangeHandler.releaseResources();
204 }
205 }
206 }
207
208 @Override
209 public String toString() {
210 return "[" +
211 "requestState=" + requestState +
212 ", responseState=" + responseState +
213 ']';
214 }
215
216 }
217