1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpVersion;
40 import org.apache.hc.core5.http.ProtocolException;
41 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
42 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
43 import org.apache.hc.core5.http.impl.nio.MessageState;
44 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45 import org.apache.hc.core5.http.nio.HandlerFactory;
46 import org.apache.hc.core5.http.protocol.HttpCoreContext;
47 import org.apache.hc.core5.http.protocol.HttpProcessor;
48 import org.apache.hc.core5.http2.H2ConnectionException;
49 import org.apache.hc.core5.http2.H2Error;
50 import org.apache.hc.core5.http2.H2StreamResetException;
51 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
52 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
53 import org.apache.hc.core5.util.Asserts;
54
55 class ClientPushH2StreamHandler implements H2StreamHandler {
56
57 private final H2StreamChannel internalOutputChannel;
58 private final HttpProcessor httpProcessor;
59 private final BasicHttpConnectionMetrics connMetrics;
60 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61 private final HttpCoreContext context;
62 private final AtomicBoolean failed;
63 private final AtomicBoolean done;
64
65 private volatile HttpRequest request;
66 private volatile AsyncPushConsumer exchangeHandler;
67 private volatile MessageState requestState;
68 private volatile MessageState responseState;
69
70 ClientPushH2StreamHandler(
71 final H2StreamChannel outputChannel,
72 final HttpProcessor httpProcessor,
73 final BasicHttpConnectionMetrics connMetrics,
74 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
75 final HttpCoreContext context) {
76 this.internalOutputChannel = outputChannel;
77 this.httpProcessor = httpProcessor;
78 this.connMetrics = connMetrics;
79 this.pushHandlerFactory = pushHandlerFactory;
80 this.context = context;
81 this.failed = new AtomicBoolean(false);
82 this.done = new AtomicBoolean(false);
83 this.requestState = MessageState.HEADERS;
84 this.responseState = MessageState.HEADERS;
85 }
86
87 @Override
88 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
89 return pushHandlerFactory;
90 }
91
92 @Override
93 public boolean isOutputReady() {
94 return false;
95 }
96
97 @Override
98 public void produceOutput() throws HttpException, IOException {
99 }
100
101 @Override
102 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
103 if (requestState == MessageState.HEADERS) {
104
105 request = DefaultH2RequestConverter.INSTANCE.convert(headers);
106 try {
107 exchangeHandler = pushHandlerFactory != null ? pushHandlerFactory.create(request, context) : null;
108 } catch (final ProtocolException ex) {
109 exchangeHandler = new NoopAsyncPushHandler();
110 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
111 }
112 if (exchangeHandler == null) {
113 exchangeHandler = new NoopAsyncPushHandler();
114 throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
115 }
116
117 context.setProtocolVersion(HttpVersion.HTTP_2);
118 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
119
120 httpProcessor.process(request, null, context);
121 connMetrics.incrementRequestCount();
122 this.requestState = MessageState.COMPLETE;
123 } else {
124 throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promise");
125 }
126 }
127
128 @Override
129 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
130 if (responseState == MessageState.HEADERS) {
131 Asserts.notNull(request, "Request");
132 Asserts.notNull(exchangeHandler, "Exchange handler");
133
134 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
135 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
136
137 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
138 httpProcessor.process(response, entityDetails, context);
139 connMetrics.incrementResponseCount();
140
141 exchangeHandler.consumePromise(request, response, entityDetails, context);
142 if (endStream) {
143 responseState = MessageState.COMPLETE;
144 exchangeHandler.streamEnd(null);
145 } else {
146 responseState = MessageState.BODY;
147 }
148 } else {
149 throw new ProtocolException("Unexpected message headers");
150 }
151 }
152
153 @Override
154 public void updateInputCapacity() throws IOException {
155 Asserts.notNull(exchangeHandler, "Exchange handler");
156 exchangeHandler.updateCapacity(internalOutputChannel);
157 }
158
159 @Override
160 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
161 if (responseState != MessageState.BODY) {
162 throw new ProtocolException("Unexpected message data");
163 }
164 Asserts.notNull(exchangeHandler, "Exchange handler");
165 if (src != null) {
166 exchangeHandler.consume(src);
167 }
168 if (endStream) {
169 responseState = MessageState.COMPLETE;
170 exchangeHandler.streamEnd(null);
171 }
172 }
173
174 public boolean isDone() {
175 return responseState == MessageState.COMPLETE;
176 }
177
178 @Override
179 public void failed(final Exception cause) {
180 try {
181 if (failed.compareAndSet(false, true)) {
182 if (exchangeHandler != null) {
183 exchangeHandler.failed(cause);
184 }
185 }
186 } finally {
187 releaseResources();
188 }
189 }
190
191 @Override
192 public void handle(final HttpException ex, final boolean endStream) throws HttpException {
193 throw ex;
194 }
195
196 @Override
197 public void releaseResources() {
198 if (done.compareAndSet(false, true)) {
199 responseState = MessageState.COMPLETE;
200 requestState = MessageState.COMPLETE;
201 if (exchangeHandler != null) {
202 exchangeHandler.releaseResources();
203 }
204 }
205 }
206
207 @Override
208 public String toString() {
209 return "[" +
210 "requestState=" + requestState +
211 ", responseState=" + responseState +
212 ']';
213 }
214
215 }
216