1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.Header;
40 import org.apache.hc.core5.http.HttpConnection;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpRequest;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
46 import org.apache.hc.core5.http.message.BasicHttpResponse;
47 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
48 import org.apache.hc.core5.http.nio.CapacityChannel;
49 import org.apache.hc.core5.http.nio.DataStreamChannel;
50 import org.apache.hc.core5.http.nio.ResponseChannel;
51 import org.apache.hc.core5.http.protocol.HttpContext;
52 import org.apache.hc.core5.http2.HttpVersionPolicy;
53 import org.apache.hc.core5.http2.config.H2Config;
54 import org.apache.hc.core5.http2.frame.RawFrame;
55 import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
56 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
57 import org.apache.hc.core5.io.CloseMode;
58 import org.apache.hc.core5.reactor.IOReactorConfig;
59 import org.apache.hc.core5.reactor.ListenerEndpoint;
60 import org.apache.hc.core5.util.TimeValue;
61
62
63
64
65 public class H2FullDuplexServerExample {
66
67 public static void main(final String[] args) throws Exception {
68 int port = 8080;
69 if (args.length >= 1) {
70 port = Integer.parseInt(args[0]);
71 }
72
73 final IOReactorConfig config = IOReactorConfig.custom()
74 .setSoTimeout(15, TimeUnit.SECONDS)
75 .setTcpNoDelay(true)
76 .build();
77
78 final H2Config h2Config = H2Config.custom()
79 .setPushEnabled(true)
80 .setMaxConcurrentStreams(100)
81 .build();
82
83 final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
84 .setIOReactorConfig(config)
85 .setH2Config(h2Config)
86 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
87 .setStreamListener(new H2StreamListener() {
88
89 @Override
90 public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
91 for (int i = 0; i < headers.size(); i++) {
92 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i));
93 }
94 }
95
96 @Override
97 public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
98 for (int i = 0; i < headers.size(); i++) {
99 System.out.println(connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i));
100 }
101 }
102
103 @Override
104 public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
105 }
106
107 @Override
108 public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
109 }
110
111 @Override
112 public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
113 }
114
115 @Override
116 public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
117 }
118
119 })
120 .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
121
122 @Override
123 public AsyncServerExchangeHandler get() {
124 return new AsyncServerExchangeHandler() {
125
126 ByteBuffer buffer = ByteBuffer.allocate(2048);
127 CapacityChannel inputCapacityChannel;
128 DataStreamChannel outputDataChannel;
129 boolean endStream;
130
131 private void ensureCapacity(final int chunk) {
132 if (buffer.remaining() < chunk) {
133 final ByteBuffer oldBuffer = buffer;
134 oldBuffer.flip();
135 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
136 buffer.put(oldBuffer);
137 }
138 }
139
140 @Override
141 public void handleRequest(
142 final HttpRequest request,
143 final EntityDetails entityDetails,
144 final ResponseChannel responseChannel,
145 final HttpContext context) throws HttpException, IOException {
146 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
147 responseChannel.sendResponse(response, entityDetails, context);
148 }
149
150 @Override
151 public void consume(final ByteBuffer src) throws IOException {
152 if (buffer.position() == 0) {
153 if (outputDataChannel != null) {
154 outputDataChannel.write(src);
155 }
156 }
157 if (src.hasRemaining()) {
158 ensureCapacity(src.remaining());
159 buffer.put(src);
160 if (outputDataChannel != null) {
161 outputDataChannel.requestOutput();
162 }
163 }
164 }
165
166 @Override
167 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
168 if (buffer.hasRemaining()) {
169 capacityChannel.update(buffer.remaining());
170 inputCapacityChannel = null;
171 } else {
172 inputCapacityChannel = capacityChannel;
173 }
174 }
175
176 @Override
177 public void streamEnd(final List<? extends Header> trailers) throws IOException {
178 endStream = true;
179 if (buffer.position() == 0) {
180 if (outputDataChannel != null) {
181 outputDataChannel.endStream();
182 }
183 } else {
184 if (outputDataChannel != null) {
185 outputDataChannel.requestOutput();
186 }
187 }
188 }
189
190 @Override
191 public int available() {
192 return buffer.position();
193 }
194
195 @Override
196 public void produce(final DataStreamChannel channel) throws IOException {
197 outputDataChannel = channel;
198 buffer.flip();
199 if (buffer.hasRemaining()) {
200 channel.write(buffer);
201 }
202 buffer.compact();
203 if (buffer.position() == 0 && endStream) {
204 channel.endStream();
205 }
206 final CapacityChannel capacityChannel = inputCapacityChannel;
207 if (capacityChannel != null && buffer.hasRemaining()) {
208 capacityChannel.update(buffer.remaining());
209 }
210 }
211
212 @Override
213 public void failed(final Exception cause) {
214 if (!(cause instanceof SocketException)) {
215 cause.printStackTrace(System.out);
216 }
217 }
218
219 @Override
220 public void releaseResources() {
221 }
222
223 };
224 }
225
226 })
227 .create();
228
229 Runtime.getRuntime().addShutdownHook(new Thread() {
230 @Override
231 public void run() {
232 System.out.println("HTTP server shutting down");
233 server.close(CloseMode.GRACEFUL);
234 }
235 });
236
237 server.start();
238 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
239 final ListenerEndpoint listenerEndpoint = future.get();
240 System.out.print("Listening on " + listenerEndpoint.getAddress());
241 server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
242 }
243
244 }