View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.http.Header;
41  import org.apache.hc.core5.http.HttpStreamResetException;
42  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
43  import org.apache.hc.core5.http.nio.CapacityChannel;
44  import org.apache.hc.core5.util.Args;
45  import org.reactivestreams.Publisher;
46  import org.reactivestreams.Subscriber;
47  import org.reactivestreams.Subscription;
48  
49  /**
50   * An asynchronous data consumer that supports Reactive Streams.
51   *
52   * @since 5.0
53   */
54  @Contract(threading = ThreadingBehavior.SAFE)
55  final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
56  
57      private final AtomicLong requests = new AtomicLong(0);
58  
59      private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
60      private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
61      private final Object flushLock = new Object();
62      private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
63      private volatile boolean cancelled;
64      private volatile boolean completed;
65      private volatile Exception exception;
66      private volatile CapacityChannel capacityChannel;
67      private volatile Subscriber<? super ByteBuffer> subscriber;
68  
69      public void failed(final Exception cause) {
70          if (!completed) {
71              exception = cause;
72              flushToSubscriber();
73          }
74      }
75  
76      @Override
77      public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
78          throwIfCancelled();
79          this.capacityChannel = capacityChannel;
80          signalCapacity(capacityChannel);
81      }
82  
83      private void signalCapacity(final CapacityChannel channel) throws IOException {
84          final int increment = windowScalingIncrement.getAndSet(0);
85          if (increment > 0) {
86              channel.update(increment);
87          }
88      }
89  
90      private void throwIfCancelled() throws IOException {
91          if (cancelled) {
92              throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
93          }
94      }
95  
96      @Override
97      public void consume(final ByteBuffer byteBuffer) throws IOException {
98          if (completed) {
99              throw new IllegalStateException("Received data past end of stream");
100         }
101         throwIfCancelled();
102 
103         final byte[] copy = new byte[byteBuffer.remaining()];
104         byteBuffer.get(copy);
105         buffers.add(ByteBuffer.wrap(copy));
106 
107         flushToSubscriber();
108     }
109 
110     @Override
111     public void streamEnd(final List<? extends Header> trailers) {
112         completed = true;
113         flushToSubscriber();
114     }
115 
116     @Override
117     public void releaseResources() {
118         this.capacityChannel = null;
119     }
120 
121     private void flushToSubscriber() {
122         synchronized (flushLock) {
123             final Subscriber<? super ByteBuffer> s = subscriber;
124             if (flushInProgress.getAndSet(true)) {
125                 return;
126             }
127             try {
128                 if (s == null) {
129                     return;
130                 }
131                 if (exception != null) {
132                     subscriber = null;
133                     s.onError(exception);
134                     return;
135                 }
136                 ByteBuffer next;
137                 while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
138                     final int bytesFreed = next.remaining();
139                     s.onNext(next);
140                     requests.decrementAndGet();
141                     windowScalingIncrement.addAndGet(bytesFreed);
142                 }
143                 final CapacityChannel localChannel = capacityChannel;
144                 if (localChannel != null) {
145                     try {
146                         signalCapacity(localChannel);
147                     } catch (final IOException e) {
148                         exception = e;
149                         s.onError(e);
150                         return;
151                     }
152                 }
153                 if (completed && buffers.isEmpty()) {
154                     subscriber = null;
155                     s.onComplete();
156                 }
157             } finally {
158                 flushInProgress.set(false);
159             }
160         }
161     }
162 
163     @Override
164     public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
165         this.subscriber = Args.notNull(subscriber, "subscriber");
166         subscriber.onSubscribe(new Subscription() {
167             @Override
168             public void request(final long increment) {
169                 if (increment <= 0) {
170                     failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
171                     return;
172                 }
173                 requests.addAndGet(increment);
174                 flushToSubscriber();
175             }
176 
177             @Override
178             public void cancel() {
179                 ReactiveDataConsumer.this.cancelled = true;
180                 ReactiveDataConsumer.this.subscriber = null;
181             }
182         });
183     }
184 }