1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.Header;
40 import org.apache.hc.core5.http.HttpConnection;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpRequest;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.impl.Http1StreamListener;
46 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
47 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
48 import org.apache.hc.core5.http.message.BasicHttpResponse;
49 import org.apache.hc.core5.http.message.RequestLine;
50 import org.apache.hc.core5.http.message.StatusLine;
51 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
52 import org.apache.hc.core5.http.nio.CapacityChannel;
53 import org.apache.hc.core5.http.nio.DataStreamChannel;
54 import org.apache.hc.core5.http.nio.ResponseChannel;
55 import org.apache.hc.core5.http.protocol.HttpContext;
56 import org.apache.hc.core5.io.CloseMode;
57 import org.apache.hc.core5.reactor.IOReactorConfig;
58 import org.apache.hc.core5.reactor.ListenerEndpoint;
59 import org.apache.hc.core5.util.TimeValue;
60
61
62
63
64 public class AsyncFullDuplexServerExample {
65
66 public static void main(final String[] args) throws Exception {
67 int port = 8080;
68 if (args.length >= 1) {
69 port = Integer.parseInt(args[0]);
70 }
71
72 final IOReactorConfig config = IOReactorConfig.custom()
73 .setSoTimeout(15, TimeUnit.SECONDS)
74 .setTcpNoDelay(true)
75 .build();
76
77 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
78 .setIOReactorConfig(config)
79 .setStreamListener(new Http1StreamListener() {
80
81 @Override
82 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
83 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
84 }
85
86 @Override
87 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
88 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
89 }
90
91 @Override
92 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
93 if (keepAlive) {
94 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
95 } else {
96 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
97 }
98 }
99
100 })
101 .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
102
103 @Override
104 public AsyncServerExchangeHandler get() {
105 return new AsyncServerExchangeHandler() {
106
107 ByteBuffer buffer = ByteBuffer.allocate(2048);
108 CapacityChannel inputCapacityChannel;
109 DataStreamChannel outputDataChannel;
110 boolean endStream;
111
112 private void ensureCapacity(final int chunk) {
113 if (buffer.remaining() < chunk) {
114 final ByteBuffer oldBuffer = buffer;
115 oldBuffer.flip();
116 buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
117 buffer.put(oldBuffer);
118 }
119 }
120
121 @Override
122 public void handleRequest(
123 final HttpRequest request,
124 final EntityDetails entityDetails,
125 final ResponseChannel responseChannel,
126 final HttpContext context) throws HttpException, IOException {
127 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
128 responseChannel.sendResponse(response, entityDetails, context);
129 }
130
131 @Override
132 public void consume(final ByteBuffer src) throws IOException {
133 if (buffer.position() == 0) {
134 if (outputDataChannel != null) {
135 outputDataChannel.write(src);
136 }
137 }
138 if (src.hasRemaining()) {
139 ensureCapacity(src.remaining());
140 buffer.put(src);
141 if (outputDataChannel != null) {
142 outputDataChannel.requestOutput();
143 }
144 }
145 }
146
147 @Override
148 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
149 if (buffer.hasRemaining()) {
150 capacityChannel.update(buffer.remaining());
151 inputCapacityChannel = null;
152 } else {
153 inputCapacityChannel = capacityChannel;
154 }
155 }
156
157 @Override
158 public void streamEnd(final List<? extends Header> trailers) throws IOException {
159 endStream = true;
160 if (buffer.position() == 0) {
161 if (outputDataChannel != null) {
162 outputDataChannel.endStream();
163 }
164 } else {
165 if (outputDataChannel != null) {
166 outputDataChannel.requestOutput();
167 }
168 }
169 }
170
171 @Override
172 public int available() {
173 return buffer.position();
174 }
175
176 @Override
177 public void produce(final DataStreamChannel channel) throws IOException {
178 outputDataChannel = channel;
179 buffer.flip();
180 if (buffer.hasRemaining()) {
181 channel.write(buffer);
182 }
183 buffer.compact();
184 if (buffer.position() == 0 && endStream) {
185 channel.endStream();
186 }
187 final CapacityChannel capacityChannel = inputCapacityChannel;
188 if (capacityChannel != null && buffer.hasRemaining()) {
189 capacityChannel.update(buffer.remaining());
190 }
191 }
192
193 @Override
194 public void failed(final Exception cause) {
195 if (!(cause instanceof SocketException)) {
196 cause.printStackTrace(System.out);
197 }
198 }
199
200 @Override
201 public void releaseResources() {
202 }
203
204 };
205 }
206
207 })
208 .create();
209
210 Runtime.getRuntime().addShutdownHook(new Thread() {
211 @Override
212 public void run() {
213 System.out.println("HTTP server shutting down");
214 server.close(CloseMode.GRACEFUL);
215 }
216 });
217
218 server.start();
219 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
220 final ListenerEndpoint listenerEndpoint = future.get();
221 System.out.print("Listening on " + listenerEndpoint.getAddress());
222 server.awaitShutdown(TimeValue.MAX_VALUE);
223 }
224
225 }