1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.Header;
40 import org.apache.hc.core5.http.HttpConnection;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpRequest;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.URIScheme;
46 import org.apache.hc.core5.http.impl.Http1StreamListener;
47 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
48 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
49 import org.apache.hc.core5.http.message.BasicHttpResponse;
50 import org.apache.hc.core5.http.message.RequestLine;
51 import org.apache.hc.core5.http.message.StatusLine;
52 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
53 import org.apache.hc.core5.http.nio.CapacityChannel;
54 import org.apache.hc.core5.http.nio.DataStreamChannel;
55 import org.apache.hc.core5.http.nio.ResponseChannel;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.apache.hc.core5.io.CloseMode;
58 import org.apache.hc.core5.reactor.IOReactorConfig;
59 import org.apache.hc.core5.reactor.ListenerEndpoint;
60 import org.apache.hc.core5.util.TimeValue;
61
62
63
64
65 public class AsyncFullDuplexServerExample {
66
67 public static void main(final String[] args) throws Exception {
68 int port = 8080;
69 if (args.length >= 1) {
70 port = Integer.parseInt(args[0]);
71 }
72
73 final IOReactorConfig config = IOReactorConfig.custom()
74 .setSoTimeout(15, TimeUnit.SECONDS)
75 .setTcpNoDelay(true)
76 .build();
77
78 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
79 .setIOReactorConfig(config)
80 .setStreamListener(new Http1StreamListener() {
81
82 @Override
83 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
84 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
85 }
86
87 @Override
88 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
89 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
90 }
91
92 @Override
93 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
94 if (keepAlive) {
95 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
96 } else {
97 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
98 }
99 }
100
101 })
102 .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
103
104 @Override
105 public AsyncServerExchangeHandler get() {
106 return new AsyncServerExchangeHandler() {
107
108 ByteBuffer buffer = ByteBuffer.allocate(2048);
109 CapacityChannel inputCapacityChannel;
110 DataStreamChannel outputDataChannel;
111 boolean endStream;
112
113 private void ensureCapacity(final int chunk) {
114 if (buffer.remaining() < chunk) {
115 final ByteBuffer oldBuffer = buffer;
116 oldBuffer.flip();
117 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
118 buffer.put(oldBuffer);
119 }
120 }
121
122 @Override
123 public void handleRequest(
124 final HttpRequest request,
125 final EntityDetails entityDetails,
126 final ResponseChannel responseChannel,
127 final HttpContext context) throws HttpException, IOException {
128 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
129 responseChannel.sendResponse(response, entityDetails, context);
130 }
131
132 @Override
133 public void consume(final ByteBuffer src) throws IOException {
134 if (buffer.position() == 0) {
135 if (outputDataChannel != null) {
136 outputDataChannel.write(src);
137 }
138 }
139 if (src.hasRemaining()) {
140 ensureCapacity(src.remaining());
141 buffer.put(src);
142 if (outputDataChannel != null) {
143 outputDataChannel.requestOutput();
144 }
145 }
146 }
147
148 @Override
149 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
150 if (buffer.hasRemaining()) {
151 capacityChannel.update(buffer.remaining());
152 inputCapacityChannel = null;
153 } else {
154 inputCapacityChannel = capacityChannel;
155 }
156 }
157
158 @Override
159 public void streamEnd(final List<? extends Header> trailers) throws IOException {
160 endStream = true;
161 if (buffer.position() == 0) {
162 if (outputDataChannel != null) {
163 outputDataChannel.endStream();
164 }
165 } else {
166 if (outputDataChannel != null) {
167 outputDataChannel.requestOutput();
168 }
169 }
170 }
171
172 @Override
173 public int available() {
174 return buffer.position();
175 }
176
177 @Override
178 public void produce(final DataStreamChannel channel) throws IOException {
179 outputDataChannel = channel;
180 buffer.flip();
181 if (buffer.hasRemaining()) {
182 channel.write(buffer);
183 }
184 buffer.compact();
185 if (buffer.position() == 0 && endStream) {
186 channel.endStream();
187 }
188 final CapacityChannel capacityChannel = inputCapacityChannel;
189 if (capacityChannel != null && buffer.hasRemaining()) {
190 capacityChannel.update(buffer.remaining());
191 }
192 }
193
194 @Override
195 public void failed(final Exception cause) {
196 if (!(cause instanceof SocketException)) {
197 cause.printStackTrace(System.out);
198 }
199 }
200
201 @Override
202 public void releaseResources() {
203 }
204
205 };
206 }
207
208 })
209 .create();
210
211 Runtime.getRuntime().addShutdownHook(new Thread() {
212 @Override
213 public void run() {
214 System.out.println("HTTP server shutting down");
215 server.close(CloseMode.GRACEFUL);
216 }
217 });
218
219 server.start();
220 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
221 final ListenerEndpoint listenerEndpoint = future.get();
222 System.out.print("Listening on " + listenerEndpoint.getAddress());
223 server.awaitShutdown(TimeValue.MAX_VALUE);
224 }
225
226 }