View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.List;
34  import java.util.concurrent.Future;
35  
36  import org.apache.hc.core5.annotation.Contract;
37  import org.apache.hc.core5.annotation.ThreadingBehavior;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.FutureCallback;
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.Header;
42  import org.apache.hc.core5.http.HttpResponse;
43  import org.apache.hc.core5.http.Message;
44  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
45  import org.apache.hc.core5.http.nio.CapacityChannel;
46  import org.apache.hc.core5.http.protocol.HttpContext;
47  import org.apache.hc.core5.util.Args;
48  import org.reactivestreams.Publisher;
49  
50  /**
51   * An {@link AsyncResponseConsumer} that publishes the response body through
52   * a {@link Publisher}, as defined by the Reactive Streams specification. The
53   * response is represented as a {@link Message} consisting of a {@link
54   * HttpResponse} representing the headers and a {@link Publisher} representing
55   * the response body as an asynchronous stream of {@link ByteBuffer} instances.
56   *
57   * @since 5.0
58   */
59  @Contract(threading = ThreadingBehavior.SAFE)
60  public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
61  
62      private final ReactiveDataConsumerl#ReactiveDataConsumer">ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
63      private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
64      private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
65  
66      private volatile BasicFuture<Void> responseCompletion;
67      private volatile HttpResponse informationResponse;
68      private volatile EntityDetails entityDetails;
69  
70      /**
71       * Creates a {@code ReactiveResponseConsumer}.
72       */
73      public ReactiveResponseConsumer() {
74          this.responseFuture = new BasicFuture<>(null);
75      }
76  
77      /**
78       * Creates a {@code ReactiveResponseConsumer} that will call back the supplied {@link FutureCallback} with a
79       * streamable response.
80       *
81       * @param responseCallback the callback to invoke when the response is available for consumption.
82       */
83      public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
84          this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
85      }
86  
87      public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
88          return responseFuture;
89      }
90  
91      /**
92       * Returns the intermediate (1xx) HTTP response if one was received.
93       *
94       * @return the information response, or {@code null} if none.
95       */
96      public HttpResponse getInformationResponse() {
97          return informationResponse;
98      }
99  
100     /**
101      * Returns the response entity details.
102      *
103      * @return the entity details, or {@code null} if none.
104      */
105     public EntityDetails getEntityDetails() {
106         return entityDetails;
107     }
108 
109     /**
110      * Returns the trailers received at the end of the response.
111      *
112      * @return a non-null list of zero or more trailers.
113      */
114     public List<Header> getTrailers() {
115         return trailers;
116     }
117 
118     @Override
119     public void consumeResponse(
120         final HttpResponse response,
121         final EntityDetails entityDetails,
122         final HttpContext httpContext,
123         final FutureCallback<Void> resultCallback
124     ) {
125         this.entityDetails = entityDetails;
126         this.responseCompletion = new BasicFuture<>(resultCallback);
127         this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
128         if (entityDetails == null) {
129             streamEnd(null);
130         }
131     }
132 
133     @Override
134     public void informationResponse(final HttpResponse response, final HttpContext httpContext) {
135         this.informationResponse = response;
136     }
137 
138     @Override
139     public void failed(final Exception cause) {
140         reactiveDataConsumer.failed(cause);
141         responseFuture.failed(cause);
142         if (responseCompletion != null) {
143             responseCompletion.failed(cause);
144         }
145     }
146 
147     @Override
148     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
149         reactiveDataConsumer.updateCapacity(capacityChannel);
150     }
151 
152     @Override
153     public void consume(final ByteBuffer src) throws IOException {
154         reactiveDataConsumer.consume(src);
155     }
156 
157     @Override
158     public void streamEnd(final List<? extends Header> trailers) {
159         if (trailers != null) {
160             this.trailers.addAll(trailers);
161         }
162         reactiveDataConsumer.streamEnd(trailers);
163         responseCompletion.completed(null);
164     }
165 
166     @Override
167     public void releaseResources() {
168         reactiveDataConsumer.releaseResources();
169         responseFuture.cancel();
170         if (responseCompletion != null) {
171             responseCompletion.cancel();
172         }
173     }
174 }