1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.concurrent.Future;
35
36 import org.apache.hc.core5.annotation.Contract;
37 import org.apache.hc.core5.annotation.ThreadingBehavior;
38 import org.apache.hc.core5.concurrent.BasicFuture;
39 import org.apache.hc.core5.concurrent.FutureCallback;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.HttpResponse;
43 import org.apache.hc.core5.http.Message;
44 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
45 import org.apache.hc.core5.http.nio.CapacityChannel;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.util.Args;
48 import org.reactivestreams.Publisher;
49
50
51
52
53
54
55
56
57
58
59 @Contract(threading = ThreadingBehavior.SAFE)
60 public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
61
62 private final ReactiveDataConsumerl#ReactiveDataConsumer">ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
63 private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
64 private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
65
66 private volatile BasicFuture<Void> responseCompletion;
67 private volatile HttpResponse informationResponse;
68 private volatile EntityDetails entityDetails;
69
70
71
72
73 public ReactiveResponseConsumer() {
74 this.responseFuture = new BasicFuture<>(null);
75 }
76
77
78
79
80
81
82
83 public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
84 this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
85 }
86
87 public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
88 return responseFuture;
89 }
90
91
92
93
94
95
96 public HttpResponse getInformationResponse() {
97 return informationResponse;
98 }
99
100
101
102
103
104
105 public EntityDetails getEntityDetails() {
106 return entityDetails;
107 }
108
109
110
111
112
113
114 public List<Header> getTrailers() {
115 return trailers;
116 }
117
118 @Override
119 public void consumeResponse(
120 final HttpResponse response,
121 final EntityDetails entityDetails,
122 final HttpContext httpContext,
123 final FutureCallback<Void> resultCallback
124 ) {
125 this.entityDetails = entityDetails;
126 this.responseCompletion = new BasicFuture<>(resultCallback);
127 this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
128 if (entityDetails == null) {
129 streamEnd(null);
130 }
131 }
132
133 @Override
134 public void informationResponse(final HttpResponse response, final HttpContext httpContext) {
135 this.informationResponse = response;
136 }
137
138 @Override
139 public void failed(final Exception cause) {
140 reactiveDataConsumer.failed(cause);
141 responseFuture.failed(cause);
142 if (responseCompletion != null) {
143 responseCompletion.failed(cause);
144 }
145 }
146
147 @Override
148 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
149 reactiveDataConsumer.updateCapacity(capacityChannel);
150 }
151
152 @Override
153 public void consume(final ByteBuffer src) throws IOException {
154 reactiveDataConsumer.consume(src);
155 }
156
157 @Override
158 public void streamEnd(final List<? extends Header> trailers) {
159 if (trailers != null) {
160 this.trailers.addAll(trailers);
161 }
162 reactiveDataConsumer.streamEnd(trailers);
163 responseCompletion.completed(null);
164 }
165
166 @Override
167 public void releaseResources() {
168 reactiveDataConsumer.releaseResources();
169 responseFuture.cancel();
170 if (responseCompletion != null) {
171 responseCompletion.cancel();
172 }
173 }
174 }