View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive.examples;
28  
29  import static java.nio.charset.StandardCharsets.UTF_8;
30  
31  import java.net.URI;
32  import java.nio.ByteBuffer;
33  import java.util.Random;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.hc.core5.http.ContentType;
38  import org.apache.hc.core5.http.Header;
39  import org.apache.hc.core5.http.HttpConnection;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.Message;
43  import org.apache.hc.core5.http.impl.Http1StreamListener;
44  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
45  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46  import org.apache.hc.core5.http.message.RequestLine;
47  import org.apache.hc.core5.http.message.StatusLine;
48  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
49  import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
50  import org.apache.hc.core5.io.CloseMode;
51  import org.apache.hc.core5.reactive.ReactiveEntityProducer;
52  import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53  import org.apache.hc.core5.reactor.IOReactorConfig;
54  import org.apache.hc.core5.util.Timeout;
55  import org.reactivestreams.Publisher;
56  
57  import io.reactivex.Flowable;
58  import io.reactivex.Notification;
59  import io.reactivex.Observable;
60  import io.reactivex.functions.Consumer;
61  import io.reactivex.functions.Function;
62  
63  /**
64   * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo will stream randomly
65   * generated text to the server via a POST request, while writing the response stream's events to standard output.
66   * This demo works out-of-the-box with {@link ReactiveFullDuplexServerExample}.
67   */
68  public class ReactiveFullDuplexClientExample {
69  
70      public static void main(final String[] args) throws Exception {
71          String endpoint = "http://localhost:8080/echo";
72          if (args.length >= 1) {
73              endpoint = args[0];
74          }
75  
76          // Create and start requester
77          final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
78              .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
79              .setStreamListener(new Http1StreamListener() {
80                  @Override
81                  public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
82                      System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
83  
84                  }
85  
86                  @Override
87                  public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
88                      System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
89                  }
90  
91                  @Override
92                  public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
93                      if (keepAlive) {
94                          System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
95                      } else {
96                          System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
97                      }
98                  }
99  
100             })
101             .create();
102 
103         Runtime.getRuntime().addShutdownHook(new Thread() {
104             @Override
105             public void run() {
106                 System.out.println("HTTP requester shutting down");
107                 requester.close(CloseMode.GRACEFUL);
108             }
109         });
110         requester.start();
111 
112         final Random random = new Random();
113         final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
114             .map(new Function<Integer, ByteBuffer>() {
115                 @Override
116                 public ByteBuffer apply(final Integer ignored) {
117                     final String str = random.nextDouble() + "\n";
118                     return ByteBuffer.wrap(str.getBytes(UTF_8));
119                 }
120             });
121         final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(new URI(endpoint))
122                 .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.TEXT_PLAIN, null))
123                 .build();
124 
125         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
126         final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
127         final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
128 
129         System.out.println(streamingResponse.getHead());
130         for (final Header header : streamingResponse.getHead().getHeaders()) {
131             System.out.println(header);
132         }
133         System.out.println();
134 
135         Observable.fromPublisher(streamingResponse.getBody())
136             .map(new Function<ByteBuffer, String>() {
137                 @Override
138                 public String apply(final ByteBuffer byteBuffer) {
139                     final byte[] string = new byte[byteBuffer.remaining()];
140                     byteBuffer.get(string);
141                     return new String(string);
142                 }
143             })
144             .materialize()
145             .forEach(new Consumer<Notification<String>>() {
146                 @Override
147                 public void accept(final Notification<String> byteBufferNotification) {
148                     System.out.println(byteBufferNotification);
149                 }
150             });
151 
152         responseComplete.get(1, TimeUnit.MINUTES);
153         System.out.println("Shutting down I/O reactor");
154         requester.initiateShutdown();
155     }
156 
157 }