1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.http.EntityDetails;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpConnection;
40 import org.apache.hc.core5.http.HttpException;
41 import org.apache.hc.core5.http.HttpRequest;
42 import org.apache.hc.core5.http.HttpResponse;
43 import org.apache.hc.core5.http.HttpStatus;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
46 import org.apache.hc.core5.http.message.BasicHttpResponse;
47 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
48 import org.apache.hc.core5.http.nio.CapacityChannel;
49 import org.apache.hc.core5.http.nio.DataStreamChannel;
50 import org.apache.hc.core5.http.nio.ResponseChannel;
51 import org.apache.hc.core5.http.protocol.HttpContext;
52 import org.apache.hc.core5.http2.HttpVersionPolicy;
53 import org.apache.hc.core5.http2.config.H2Config;
54 import org.apache.hc.core5.http2.frame.RawFrame;
55 import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
56 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
57 import org.apache.hc.core5.io.CloseMode;
58 import org.apache.hc.core5.reactor.IOReactorConfig;
59 import org.apache.hc.core5.reactor.ListenerEndpoint;
60 import org.apache.hc.core5.util.TimeValue;
61
62
63
64
65 public class H2FullDuplexServerExample {
66
67 public static void main(final String[] args) throws Exception {
68 int port = 8080;
69 if (args.length >= 1) {
70 port = Integer.parseInt(args[0]);
71 }
72
73 final IOReactorConfig config = IOReactorConfig.custom()
74 .setSoTimeout(15, TimeUnit.SECONDS)
75 .setTcpNoDelay(true)
76 .build();
77
78 final H2Config h2Config = H2Config.custom()
79 .setPushEnabled(true)
80 .setMaxConcurrentStreams(100)
81 .build();
82
83 final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
84 .setIOReactorConfig(config)
85 .setH2Config(h2Config)
86 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
87 .setStreamListener(new H2StreamListener() {
88
89 @Override
90 public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
91 for (int i = 0; i < headers.size(); i++) {
92 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
93 }
94 }
95
96 @Override
97 public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
98 for (int i = 0; i < headers.size(); i++) {
99 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
100 }
101 }
102
103 @Override
104 public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
105 }
106
107 @Override
108 public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
109 }
110
111 @Override
112 public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
113 }
114
115 @Override
116 public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
117 }
118
119 })
120 .register("/echo", () -> new AsyncServerExchangeHandler() {
121
122 ByteBuffer buffer = ByteBuffer.allocate(2048);
123 CapacityChannel inputCapacityChannel;
124 DataStreamChannel outputDataChannel;
125 boolean endStream;
126
127 private void ensureCapacity(final int chunk) {
128 if (buffer.remaining() < chunk) {
129 final ByteBuffer oldBuffer = buffer;
130 oldBuffer.flip();
131 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
132 buffer.put(oldBuffer);
133 }
134 }
135
136 @Override
137 public void handleRequest(
138 final HttpRequest request,
139 final EntityDetails entityDetails,
140 final ResponseChannel responseChannel,
141 final HttpContext context) throws HttpException, IOException {
142 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
143 responseChannel.sendResponse(response, entityDetails, context);
144 }
145
146 @Override
147 public void consume(final ByteBuffer src) throws IOException {
148 if (buffer.position() == 0) {
149 if (outputDataChannel != null) {
150 outputDataChannel.write(src);
151 }
152 }
153 if (src.hasRemaining()) {
154 ensureCapacity(src.remaining());
155 buffer.put(src);
156 if (outputDataChannel != null) {
157 outputDataChannel.requestOutput();
158 }
159 }
160 }
161
162 @Override
163 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
164 if (buffer.hasRemaining()) {
165 capacityChannel.update(buffer.remaining());
166 inputCapacityChannel = null;
167 } else {
168 inputCapacityChannel = capacityChannel;
169 }
170 }
171
172 @Override
173 public void streamEnd(final List<? extends Header> trailers) throws IOException {
174 endStream = true;
175 if (buffer.position() == 0) {
176 if (outputDataChannel != null) {
177 outputDataChannel.endStream();
178 }
179 } else {
180 if (outputDataChannel != null) {
181 outputDataChannel.requestOutput();
182 }
183 }
184 }
185
186 @Override
187 public int available() {
188 return buffer.position();
189 }
190
191 @Override
192 public void produce(final DataStreamChannel channel) throws IOException {
193 outputDataChannel = channel;
194 buffer.flip();
195 if (buffer.hasRemaining()) {
196 channel.write(buffer);
197 }
198 buffer.compact();
199 if (buffer.position() == 0 && endStream) {
200 channel.endStream();
201 }
202 final CapacityChannel capacityChannel = inputCapacityChannel;
203 if (capacityChannel != null && buffer.hasRemaining()) {
204 capacityChannel.update(buffer.remaining());
205 }
206 }
207
208 @Override
209 public void failed(final Exception cause) {
210 if (!(cause instanceof SocketException)) {
211 cause.printStackTrace(System.out);
212 }
213 }
214
215 @Override
216 public void releaseResources() {
217 }
218
219 })
220 .create();
221
222 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
223 System.out.println("HTTP server shutting down");
224 server.close(CloseMode.GRACEFUL);
225 }));
226
227 server.start();
228 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
229 final ListenerEndpoint listenerEndpoint = future.get();
230 System.out.print("Listening on " + listenerEndpoint.getAddress());
231 server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
232 }
233
234 }