1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive.examples;
28
29 import java.net.InetSocketAddress;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.hc.core5.http.ContentType;
34 import org.apache.hc.core5.http.HeaderElements;
35 import org.apache.hc.core5.http.HttpConnection;
36 import org.apache.hc.core5.http.HttpHeaders;
37 import org.apache.hc.core5.http.HttpRequest;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.URIScheme;
40 import org.apache.hc.core5.http.impl.BasicEntityDetails;
41 import org.apache.hc.core5.http.impl.Http1StreamListener;
42 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
43 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
44 import org.apache.hc.core5.http.message.BasicHeader;
45 import org.apache.hc.core5.http.message.BasicHttpResponse;
46 import org.apache.hc.core5.http.message.RequestLine;
47 import org.apache.hc.core5.http.message.StatusLine;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
50 import org.apache.hc.core5.reactor.IOReactorConfig;
51 import org.apache.hc.core5.reactor.ListenerEndpoint;
52 import org.apache.hc.core5.util.TimeValue;
53
54
55
56
57
58 public class ReactiveFullDuplexServerExample {
59 public static void main(final String[] args) throws Exception {
60 int port = 8080;
61 if (args.length >= 1) {
62 port = Integer.parseInt(args[0]);
63 }
64
65 final IOReactorConfig config = IOReactorConfig.custom()
66 .setSoTimeout(15, TimeUnit.SECONDS)
67 .setTcpNoDelay(true)
68 .build();
69
70 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
71 .setExceptionCallback(e -> e.printStackTrace())
72 .setIOReactorConfig(config)
73 .setStreamListener(new Http1StreamListener() {
74 @Override
75 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
76 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
77
78 }
79
80 @Override
81 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
82 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
83 }
84
85 @Override
86 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
87 if (keepAlive) {
88 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
89 } else {
90 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
91 }
92 }
93
94 })
95 .register("/echo", () -> new ReactiveServerExchangeHandler((request, entityDetails, responseChannel, context, requestBody, responseBodyFuture) -> {
96 if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
97 responseChannel.sendInformation(new BasicHttpResponse(100), context);
98 }
99
100 responseChannel.sendResponse(
101 new BasicHttpResponse(200),
102 new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
103 context);
104
105
106
107 responseBodyFuture.execute(requestBody);
108 }))
109 .create();
110
111 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
112 System.out.println("HTTP server shutting down");
113 server.close(CloseMode.GRACEFUL);
114 }));
115
116 server.start();
117 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
118 final ListenerEndpoint listenerEndpoint = future.get();
119 System.out.print("Listening on " + listenerEndpoint.getAddress());
120 server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
121 }
122 }