View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive.examples;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.hc.core5.function.Callback;
36  import org.apache.hc.core5.function.Supplier;
37  import org.apache.hc.core5.http.ContentType;
38  import org.apache.hc.core5.http.EntityDetails;
39  import org.apache.hc.core5.http.HeaderElements;
40  import org.apache.hc.core5.http.HttpConnection;
41  import org.apache.hc.core5.http.HttpException;
42  import org.apache.hc.core5.http.HttpHeaders;
43  import org.apache.hc.core5.http.HttpRequest;
44  import org.apache.hc.core5.http.HttpResponse;
45  import org.apache.hc.core5.http.URIScheme;
46  import org.apache.hc.core5.http.impl.BasicEntityDetails;
47  import org.apache.hc.core5.http.impl.Http1StreamListener;
48  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
49  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
50  import org.apache.hc.core5.http.message.BasicHeader;
51  import org.apache.hc.core5.http.message.BasicHttpResponse;
52  import org.apache.hc.core5.http.message.RequestLine;
53  import org.apache.hc.core5.http.message.StatusLine;
54  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55  import org.apache.hc.core5.http.nio.ResponseChannel;
56  import org.apache.hc.core5.http.protocol.HttpContext;
57  import org.apache.hc.core5.io.CloseMode;
58  import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
59  import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
60  import org.apache.hc.core5.reactor.IOReactorConfig;
61  import org.apache.hc.core5.reactor.ListenerEndpoint;
62  import org.apache.hc.core5.util.TimeValue;
63  import org.reactivestreams.Publisher;
64  
65  /**
66   * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo server works out-of-the-box
67   * with {@link ReactiveFullDuplexClientExample}; it can also be invoked interactively using telnet.
68   */
69  public class ReactiveFullDuplexServerExample {
70      public static void main(final String[] args) throws Exception {
71          int port = 8080;
72          if (args.length >= 1) {
73              port = Integer.parseInt(args[0]);
74          }
75  
76          final IOReactorConfig config = IOReactorConfig.custom()
77              .setSoTimeout(15, TimeUnit.SECONDS)
78              .setTcpNoDelay(true)
79              .build();
80  
81          final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
82              .setIOReactorConfig(config)
83              .setStreamListener(new Http1StreamListener() {
84                  @Override
85                  public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
86                      System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
87  
88                  }
89  
90                  @Override
91                  public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
92                      System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
93                  }
94  
95                  @Override
96                  public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
97                      if (keepAlive) {
98                          System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
99                      } else {
100                         System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
101                     }
102                 }
103 
104             })
105             .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
106                 @Override
107                 public AsyncServerExchangeHandler get() {
108                     return new ReactiveServerExchangeHandler(new ReactiveRequestProcessor() {
109                         @Override
110                         public void processRequest(
111                             final HttpRequest request,
112                             final EntityDetails entityDetails,
113                             final ResponseChannel responseChannel,
114                             final HttpContext context,
115                             final Publisher<ByteBuffer> requestBody,
116                             final Callback<Publisher<ByteBuffer>> responseBodyFuture
117                         ) throws HttpException, IOException {
118                             if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
119                                 responseChannel.sendInformation(new BasicHttpResponse(100), context);
120                             }
121 
122                             responseChannel.sendResponse(
123                                 new BasicHttpResponse(200),
124                                 new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
125                                 context);
126 
127                             // Simply using the request publisher as the response publisher will
128                             // cause the server to echo the request body.
129                             responseBodyFuture.execute(requestBody);
130                         }
131                     });
132                 }
133             })
134             .create();
135 
136         Runtime.getRuntime().addShutdownHook(new Thread() {
137             @Override
138             public void run() {
139                 System.out.println("HTTP server shutting down");
140                 server.close(CloseMode.GRACEFUL);
141             }
142         });
143 
144         server.start();
145         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port), URIScheme.HTTP);
146         final ListenerEndpoint listenerEndpoint = future.get();
147         System.out.print("Listening on " + listenerEndpoint.getAddress());
148         server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
149     }
150 }