View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayDeque;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicReference;
34  import java.util.concurrent.locks.ReentrantLock;
35  
36  import org.apache.hc.core5.annotation.Contract;
37  import org.apache.hc.core5.annotation.ThreadingBehavior;
38  import org.apache.hc.core5.http.HttpStreamResetException;
39  import org.apache.hc.core5.http.nio.AsyncDataProducer;
40  import org.apache.hc.core5.http.nio.DataStreamChannel;
41  import org.apache.hc.core5.util.Args;
42  import org.reactivestreams.Publisher;
43  import org.reactivestreams.Subscriber;
44  import org.reactivestreams.Subscription;
45  
46  /**
47   * An asynchronous data producer that supports Reactive Streams.
48   *
49   * @since 5.0
50   */
51  @Contract(threading = ThreadingBehavior.SAFE)
52  final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
53  
54      private static final int BUFFER_WINDOW_SIZE = 5;
55  
56      private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
57      private final AtomicReference<Throwable> exception = new AtomicReference<>();
58      private final AtomicBoolean complete = new AtomicBoolean(false);
59      private final Publisher<ByteBuffer> publisher;
60      private final AtomicReference<Subscription> subscription = new AtomicReference<>();
61      private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
62  
63      private final ReentrantLock lock;
64  
65      public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
66          this.publisher = Args.notNull(publisher, "publisher");
67          this.lock = new ReentrantLock();
68      }
69  
70      void setChannel(final DataStreamChannel channel) {
71          requestChannel.set(channel);
72      }
73  
74      @Override
75      public void onSubscribe(final Subscription subscription) {
76          if (this.subscription.getAndSet(subscription) != null) {
77              throw new IllegalStateException("Already subscribed");
78          }
79  
80          subscription.request(BUFFER_WINDOW_SIZE);
81      }
82  
83      @Override
84      public void onNext(final ByteBuffer byteBuffer) {
85          final byte[] copy = new byte[byteBuffer.remaining()];
86          byteBuffer.get(copy);
87          lock.lock();
88          try {
89              buffers.add(ByteBuffer.wrap(copy));
90          } finally {
91              lock.unlock();
92          }
93          signalReadiness();
94      }
95  
96      @Override
97      public void onError(final Throwable throwable) {
98          subscription.set(null);
99          exception.set(throwable);
100         signalReadiness();
101     }
102 
103     @Override
104     public void onComplete() {
105         subscription.set(null);
106         complete.set(true);
107         signalReadiness();
108     }
109 
110     private void signalReadiness() {
111         final DataStreamChannel channel = requestChannel.get();
112         if (channel == null) {
113             throw new IllegalStateException("Output channel is not set");
114         }
115         channel.requestOutput();
116     }
117 
118     @Override
119     public int available() {
120         if (exception.get() != null || complete.get()) {
121             return 1;
122         } else {
123             lock.lock();
124             try {
125                 int sum = 0;
126                 for (final ByteBuffer buffer : buffers) {
127                     sum += buffer.remaining();
128                 }
129                 return sum;
130             } finally {
131                 lock.unlock();
132             }
133         }
134     }
135 
136     @Override
137     public void produce(final DataStreamChannel channel) throws IOException {
138         if (requestChannel.get() == null) {
139             requestChannel.set(channel);
140             publisher.subscribe(this);
141         }
142 
143         final Throwable t = exception.get();
144         final Subscription s = subscription.get();
145         int buffersToReplenish = 0;
146         try {
147             lock.lock();
148             try {
149                 if (t != null) {
150                     throw new HttpStreamResetException(t.getMessage(), t);
151                 } else if (this.complete.get() && buffers.isEmpty()) {
152                     channel.endStream();
153                 } else {
154                     while (!buffers.isEmpty()) {
155                         final ByteBuffer nextBuffer = buffers.remove();
156                         channel.write(nextBuffer);
157                         if (nextBuffer.remaining() > 0) {
158                             buffers.push(nextBuffer);
159                             break;
160                         } else if (s != null) {
161                             // We defer the #request call until after we release the buffer lock.
162                             buffersToReplenish++;
163                         }
164                     }
165                 }
166             } finally {
167                 lock.unlock();
168             }
169         } finally {
170             if (s != null && buffersToReplenish > 0) {
171                 s.request(buffersToReplenish);
172             }
173         }
174     }
175 
176     @Override
177     public void releaseResources() {
178         final Subscription s = subscription.getAndSet(null);
179         if (s != null) {
180             s.cancel();
181         }
182     }
183 }