1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive.examples;
28
29 import static java.nio.charset.StandardCharsets.UTF_8;
30
31 import java.net.URI;
32 import java.nio.ByteBuffer;
33 import java.util.Random;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpConnection;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.Message;
43 import org.apache.hc.core5.http.impl.Http1StreamListener;
44 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46 import org.apache.hc.core5.http.message.RequestLine;
47 import org.apache.hc.core5.http.message.StatusLine;
48 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
49 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
52 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53 import org.apache.hc.core5.reactor.IOReactorConfig;
54 import org.apache.hc.core5.util.Timeout;
55 import org.reactivestreams.Publisher;
56
57 import io.reactivex.Flowable;
58 import io.reactivex.Notification;
59 import io.reactivex.Observable;
60 import io.reactivex.functions.Consumer;
61 import io.reactivex.functions.Function;
62
63
64
65
66
67
68 public class ReactiveFullDuplexClientExample {
69
70 public static void main(final String[] args) throws Exception {
71 String endpoint = "http://localhost:8080/echo";
72 if (args.length >= 1) {
73 endpoint = args[0];
74 }
75
76
77 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
78 .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
79 .setStreamListener(new Http1StreamListener() {
80 @Override
81 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
82 System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request));
83
84 }
85
86 @Override
87 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
88 System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response));
89 }
90
91 @Override
92 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
93 if (keepAlive) {
94 System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)");
95 } else {
96 System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)");
97 }
98 }
99
100 })
101 .create();
102
103 Runtime.getRuntime().addShutdownHook(new Thread() {
104 @Override
105 public void run() {
106 System.out.println("HTTP requester shutting down");
107 requester.close(CloseMode.GRACEFUL);
108 }
109 });
110 requester.start();
111
112 final Random random = new Random();
113 final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
114 .map(new Function<Integer, ByteBuffer>() {
115 @Override
116 public ByteBuffer apply(final Integer ignored) {
117 final String str = random.nextDouble() + "\n";
118 return ByteBuffer.wrap(str.getBytes(UTF_8));
119 }
120 });
121 final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(new URI(endpoint))
122 .setEntity(new ReactiveEntityProducer(publisher, -1, ContentType.TEXT_PLAIN, null))
123 .build();
124
125 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
126 final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
127 final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
128
129 System.out.println(streamingResponse.getHead());
130 for (final Header header : streamingResponse.getHead().getHeaders()) {
131 System.out.println(header);
132 }
133 System.out.println();
134
135 Observable.fromPublisher(streamingResponse.getBody())
136 .map(new Function<ByteBuffer, String>() {
137 @Override
138 public String apply(final ByteBuffer byteBuffer) {
139 final byte[] string = new byte[byteBuffer.remaining()];
140 byteBuffer.get(string);
141 return new String(string);
142 }
143 })
144 .materialize()
145 .forEach(new Consumer<Notification<String>>() {
146 @Override
147 public void accept(final Notification<String> byteBufferNotification) {
148 System.out.println(byteBufferNotification);
149 }
150 });
151
152 responseComplete.get(1, TimeUnit.MINUTES);
153 System.out.println("Shutting down I/O reactor");
154 requester.initiateShutdown();
155 }
156
157 }