1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.Header;
40 import org.apache.hc.core5.http.HttpConnection;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpRequest;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.URIScheme;
46 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
47 import org.apache.hc.core5.http.message.BasicHttpResponse;
48 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
49 import org.apache.hc.core5.http.nio.CapacityChannel;
50 import org.apache.hc.core5.http.nio.DataStreamChannel;
51 import org.apache.hc.core5.http.nio.ResponseChannel;
52 import org.apache.hc.core5.http.protocol.HttpContext;
53 import org.apache.hc.core5.http2.HttpVersionPolicy;
54 import org.apache.hc.core5.http2.config.H2Config;
55 import org.apache.hc.core5.http2.frame.RawFrame;
56 import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
57 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
58 import org.apache.hc.core5.io.CloseMode;
59 import org.apache.hc.core5.reactor.IOReactorConfig;
60 import org.apache.hc.core5.reactor.ListenerEndpoint;
61 import org.apache.hc.core5.util.TimeValue;
62
63
64
65
66 public class H2FullDuplexServerExample {
67
68 public static void main(final String[] args) throws Exception {
69 int port = 8080;
70 if (args.length >= 1) {
71 port = Integer.parseInt(args[0]);
72 }
73
74 final IOReactorConfig config = IOReactorConfig.custom()
75 .setSoTimeout(15, TimeUnit.SECONDS)
76 .setTcpNoDelay(true)
77 .build();
78
79 final H2Config h2Config = H2Config.custom()
80 .setPushEnabled(true)
81 .setMaxConcurrentStreams(100)
82 .build();
83
84 final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
85 .setIOReactorConfig(config)
86 .setH2Config(h2Config)
87 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
88 .setStreamListener(new H2StreamListener() {
89
90 @Override
91 public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
92 for (int i = 0; i < headers.size(); i++) {
93 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
94 }
95 }
96
97 @Override
98 public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
99 for (int i = 0; i < headers.size(); i++) {
100 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
101 }
102 }
103
104 @Override
105 public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
106 }
107
108 @Override
109 public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
110 }
111
112 @Override
113 public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
114 }
115
116 @Override
117 public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
118 }
119
120 })
121 .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
122
123 @Override
124 public AsyncServerExchangeHandler get() {
125 return new AsyncServerExchangeHandler() {
126
127 ByteBuffer buffer = ByteBuffer.allocate(2048);
128 CapacityChannel inputCapacityChannel;
129 DataStreamChannel outputDataChannel;
130 boolean endStream;
131
132 private void ensureCapacity(final int chunk) {
133 if (buffer.remaining() < chunk) {
134 final ByteBuffer oldBuffer = buffer;
135 oldBuffer.flip();
136 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
137 buffer.put(oldBuffer);
138 }
139 }
140
141 @Override
142 public void handleRequest(
143 final HttpRequest request,
144 final EntityDetails entityDetails,
145 final ResponseChannel responseChannel,
146 final HttpContext context) throws HttpException, IOException {
147 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
148 responseChannel.sendResponse(response, entityDetails, context);
149 }
150
151 @Override
152 public void consume(final ByteBuffer src) throws IOException {
153 if (buffer.position() == 0) {
154 if (outputDataChannel != null) {
155 outputDataChannel.write(src);
156 }
157 }
158 if (src.hasRemaining()) {
159 ensureCapacity(src.remaining());
160 buffer.put(src);
161 if (outputDataChannel != null) {
162 outputDataChannel.requestOutput();
163 }
164 }
165 }
166
167 @Override
168 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
169 if (buffer.hasRemaining()) {
170 capacityChannel.update(buffer.remaining());
171 inputCapacityChannel = null;
172 } else {
173 inputCapacityChannel = capacityChannel;
174 }
175 }
176
177 @Override
178 public void streamEnd(final List<? extends Header> trailers) throws IOException {
179 endStream = true;
180 if (buffer.position() == 0) {
181 if (outputDataChannel != null) {
182 outputDataChannel.endStream();
183 }
184 } else {
185 if (outputDataChannel != null) {
186 outputDataChannel.requestOutput();
187 }
188 }
189 }
190
191 @Override
192 public int available() {
193 return buffer.position();
194 }
195
196 @Override
197 public void produce(final DataStreamChannel channel) throws IOException {
198 outputDataChannel = channel;
199 buffer.flip();
200 if (buffer.hasRemaining()) {
201 channel.write(buffer);
202 }
203 buffer.compact();
204 if (buffer.position() == 0 && endStream) {
205 channel.endStream();
206 }
207 final CapacityChannel capacityChannel = inputCapacityChannel;
208 if (capacityChannel != null && buffer.hasRemaining()) {
209 capacityChannel.update(buffer.remaining());
210 }
211 }
212
213 @Override
214 public void failed(final Exception cause) {
215 if (!(cause instanceof SocketException)) {
216 cause.printStackTrace(System.out);
217 }
218 }
219
220 @Override
221 public void releaseResources() {
222 }
223
224 };
225 }
226
227 })
228 .create();
229
230 Runtime.getRuntime().addShutdownHook(new Thread() {
231 @Override
232 public void run() {
233 System.out.println("HTTP server shutting down");
234 server.close(CloseMode.GRACEFUL);
235 }
236 });
237
238 server.start();
239 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
240 final ListenerEndpoint listenerEndpoint = future.get();
241 System.out.print("Listening on " + listenerEndpoint.getAddress());
242 server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
243 }
244
245 }