1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.hc.core5.function.Callback;
36 import org.apache.hc.core5.function.Supplier;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.HeaderElements;
40 import org.apache.hc.core5.http.HttpConnection;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpHeaders;
43 import org.apache.hc.core5.http.HttpRequest;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.URIScheme;
46 import org.apache.hc.core5.http.impl.BasicEntityDetails;
47 import org.apache.hc.core5.http.impl.Http1StreamListener;
48 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
49 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
50 import org.apache.hc.core5.http.message.BasicHeader;
51 import org.apache.hc.core5.http.message.BasicHttpResponse;
52 import org.apache.hc.core5.http.message.RequestLine;
53 import org.apache.hc.core5.http.message.StatusLine;
54 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55 import org.apache.hc.core5.http.nio.ResponseChannel;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.apache.hc.core5.io.CloseMode;
58 import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
59 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
60 import org.apache.hc.core5.reactor.IOReactorConfig;
61 import org.apache.hc.core5.reactor.ListenerEndpoint;
62 import org.apache.hc.core5.util.TimeValue;
63 import org.reactivestreams.Publisher;
64
65
66
67
68
69 public class ReactiveFullDuplexServerExample {
70 public static void main(final String[] args) throws Exception {
71 int port = 8080;
72 if (args.length >= 1) {
73 port = Integer.parseInt(args[0]);
74 }
75
76 final IOReactorConfig config = IOReactorConfig.custom()
77 .setSoTimeout(15, TimeUnit.SECONDS)
78 .setTcpNoDelay(true)
79 .build();
80
81 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
82 .setIOReactorConfig(config)
83 .setStreamListener(new Http1StreamListener() {
84 @Override
85 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
86 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
87
88 }
89
90 @Override
91 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
92 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
93 }
94
95 @Override
96 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
97 if (keepAlive) {
98 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
99 } else {
100 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
101 }
102 }
103
104 })
105 .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
106 @Override
107 public AsyncServerExchangeHandler get() {
108 return new ReactiveServerExchangeHandler(new ReactiveRequestProcessor() {
109 @Override
110 public void processRequest(
111 final HttpRequest request,
112 final EntityDetails entityDetails,
113 final ResponseChannel responseChannel,
114 final HttpContext context,
115 final Publisher<ByteBuffer> requestBody,
116 final Callback<Publisher<ByteBuffer>> responseBodyFuture
117 ) throws HttpException, IOException {
118 if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
119 responseChannel.sendInformation(new BasicHttpResponse(100), context);
120 }
121
122 responseChannel.sendResponse(
123 new BasicHttpResponse(200),
124 new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
125 context);
126
127
128
129 responseBodyFuture.execute(requestBody);
130 }
131 });
132 }
133 })
134 .create();
135
136 Runtime.getRuntime().addShutdownHook(new Thread() {
137 @Override
138 public void run() {
139 System.out.println("HTTP server shutting down");
140 server.close(CloseMode.GRACEFUL);
141 }
142 });
143
144 server.start();
145 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
146 final ListenerEndpoint listenerEndpoint = future.get();
147 System.out.print("Listening on " + listenerEndpoint.getAddress());
148 server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
149 }
150 }