View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive.examples;
28  
29  import static java.nio.charset.StandardCharsets.UTF_8;
30  
31  import java.net.URI;
32  import java.nio.ByteBuffer;
33  import java.util.Random;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.hc.core5.http.ContentType;
38  import org.apache.hc.core5.http.Header;
39  import org.apache.hc.core5.http.HttpConnection;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.Message;
43  import org.apache.hc.core5.http.impl.Http1StreamListener;
44  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
45  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46  import org.apache.hc.core5.http.message.RequestLine;
47  import org.apache.hc.core5.http.message.StatusLine;
48  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
49  import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
50  import org.apache.hc.core5.io.CloseMode;
51  import org.apache.hc.core5.reactive.ReactiveEntityProducer;
52  import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53  import org.apache.hc.core5.reactor.IOReactorConfig;
54  import org.apache.hc.core5.util.Timeout;
55  import org.reactivestreams.Publisher;
56  
57  import io.reactivex.rxjava3.core.Flowable;
58  import io.reactivex.rxjava3.core.Observable;
59  
60  /**
61   * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo will stream randomly
62   * generated text to the server via a POST request, while writing the response stream's events to standard output.
63   * This demo works out-of-the-box with {@link ReactiveFullDuplexServerExample}.
64   */
65  public class ReactiveFullDuplexClientExample {
66  
67      public static void main(final String[] args) throws Exception {
68          String endpoint = "http://localhost:8080/echo";
69          if (args.length >= 1) {
70              endpoint = args[0];
71          }
72  
73          // Create and start requester
74          final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
75              .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
76              .setStreamListener(new Http1StreamListener() {
77                  @Override
78                  public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
79                      System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
80  
81                  }
82  
83                  @Override
84                  public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
85                      System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
86                  }
87  
88                  @Override
89                  public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
90                      if (keepAlive) {
91                          System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
92                      } else {
93                          System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
94                      }
95                  }
96  
97              })
98              .create();
99  
100         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
101             System.out.println("HTTP requester shutting down");
102             requester.close(CloseMode.GRACEFUL);
103         }));
104         requester.start();
105 
106         final Random random = new Random();
107         final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
108             .map(ignored -> {
109                 final String str = random.nextDouble() + "\n";
110                 return ByteBuffer.wrap(str.getBytes(UTF_8));
111             });
112         final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(new URI(endpoint))
113                 .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.TEXT_PLAIN, null))
114                 .build();
115 
116         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
117         final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
118         final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
119 
120         System.out.println(streamingResponse.getHead());
121         for (final Header header : streamingResponse.getHead().getHeaders()) {
122             System.out.println(header);
123         }
124         System.out.println();
125 
126         Observable.fromPublisher(streamingResponse.getBody())
127             .map(byteBuffer -> {
128                 final byte[] string = new byte[byteBuffer.remaining()];
129                 byteBuffer.get(string);
130                 return new String(string);
131             })
132             .materialize()
133             .forEach(System.out::println);
134 
135         responseComplete.get(1, TimeUnit.MINUTES);
136         System.out.println("Shutting down I/O reactor");
137         requester.initiateShutdown();
138     }
139 
140 }