View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactive;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayDeque;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.apache.hc.core5.annotation.Contract;
36  import org.apache.hc.core5.annotation.ThreadingBehavior;
37  import org.apache.hc.core5.http.HttpStreamResetException;
38  import org.apache.hc.core5.http.nio.AsyncDataProducer;
39  import org.apache.hc.core5.http.nio.DataStreamChannel;
40  import org.apache.hc.core5.util.Args;
41  import org.reactivestreams.Publisher;
42  import org.reactivestreams.Subscriber;
43  import org.reactivestreams.Subscription;
44  
45  /**
46   * An asynchronous data producer that supports Reactive Streams.
47   *
48   * @since 5.0
49   */
50  @Contract(threading = ThreadingBehavior.SAFE)
51  final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
52  
53      private static final int BUFFER_WINDOW_SIZE = 5;
54  
55      private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
56      private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
57      private final AtomicBoolean complete = new AtomicBoolean(false);
58      private final Publisher<ByteBuffer> publisher;
59      private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
60      private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
61  
62      public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
63          this.publisher = Args.notNull(publisher, "publisher");
64      }
65  
66      void setChannel(final DataStreamChannel channel) {
67          requestChannel.set(channel);
68      }
69  
70      @Override
71      public void onSubscribe(final Subscription subscription) {
72          if (this.subscription.getAndSet(subscription) != null) {
73              throw new IllegalStateException("Already subscribed");
74          }
75  
76          subscription.request(BUFFER_WINDOW_SIZE);
77      }
78  
79      @Override
80      public void onNext(final ByteBuffer byteBuffer) {
81          final byte[] copy = new byte[byteBuffer.remaining()];
82          byteBuffer.get(copy);
83          synchronized (buffers) {
84              buffers.add(ByteBuffer.wrap(copy));
85          }
86          signalReadiness();
87      }
88  
89      @Override
90      public void onError(final Throwable throwable) {
91          subscription.set(null);
92          exception.set(throwable);
93          signalReadiness();
94      }
95  
96      @Override
97      public void onComplete() {
98          subscription.set(null);
99          complete.set(true);
100         signalReadiness();
101     }
102 
103     private void signalReadiness() {
104         final DataStreamChannel channel = requestChannel.get();
105         if (channel == null) {
106             throw new IllegalStateException("Output channel is not set");
107         }
108         channel.requestOutput();
109     }
110 
111     @Override
112     public int available() {
113         if (exception.get() != null || complete.get()) {
114             return 1;
115         } else {
116             synchronized (buffers) {
117                 int sum = 0;
118                 for (final ByteBuffer buffer : buffers) {
119                     sum += buffer.remaining();
120                 }
121                 return sum;
122             }
123         }
124     }
125 
126     @Override
127     public void produce(final DataStreamChannel channel) throws IOException {
128         if (requestChannel.get() == null) {
129             requestChannel.set(channel);
130             publisher.subscribe(this);
131         }
132 
133         final Throwable t = exception.get();
134         final Subscription s = subscription.get();
135         int buffersToReplenish = 0;
136         try {
137             synchronized (buffers) {
138                 if (t != null) {
139                     throw new HttpStreamResetException(t.getMessage(), t);
140                 } else if (this.complete.get() && buffers.isEmpty()) {
141                     channel.endStream();
142                 } else {
143                     while (!buffers.isEmpty()) {
144                         final ByteBuffer nextBuffer = buffers.remove();
145                         channel.write(nextBuffer);
146                         if (nextBuffer.remaining() > 0) {
147                             buffers.push(nextBuffer);
148                             break;
149                         } else if (s != null) {
150                             // We defer the #request call until after we release the buffer lock.
151                             buffersToReplenish++;
152                         }
153                     }
154                 }
155             }
156         } finally {
157             if (s != null && buffersToReplenish > 0) {
158                 s.request(buffersToReplenish);
159             }
160         }
161     }
162 
163     @Override
164     public void releaseResources() {
165         final Subscription s = subscription.getAndSet(null);
166         if (s != null) {
167             s.cancel();
168         }
169     }
170 }