1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayDeque;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicReference;
34 import java.util.concurrent.locks.ReentrantLock;
35
36 import org.apache.hc.core5.annotation.Contract;
37 import org.apache.hc.core5.annotation.ThreadingBehavior;
38 import org.apache.hc.core5.http.HttpStreamResetException;
39 import org.apache.hc.core5.http.nio.AsyncDataProducer;
40 import org.apache.hc.core5.http.nio.DataStreamChannel;
41 import org.apache.hc.core5.util.Args;
42 import org.reactivestreams.Publisher;
43 import org.reactivestreams.Subscriber;
44 import org.reactivestreams.Subscription;
45
46
47
48
49
50
51 @Contract(threading = ThreadingBehavior.SAFE)
52 final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
53
54 private static final int BUFFER_WINDOW_SIZE = 5;
55
56 private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
57 private final AtomicReference<Throwable> exception = new AtomicReference<>();
58 private final AtomicBoolean complete = new AtomicBoolean(false);
59 private final Publisher<ByteBuffer> publisher;
60 private final AtomicReference<Subscription> subscription = new AtomicReference<>();
61 private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>();
62
63 private final ReentrantLock lock;
64
65 public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
66 this.publisher = Args.notNull(publisher, "publisher");
67 this.lock = new ReentrantLock();
68 }
69
70 void setChannel(final DataStreamChannel channel) {
71 requestChannel.set(channel);
72 }
73
74 @Override
75 public void onSubscribe(final Subscription subscription) {
76 if (this.subscription.getAndSet(subscription) != null) {
77 throw new IllegalStateException("Already subscribed");
78 }
79
80 subscription.request(BUFFER_WINDOW_SIZE);
81 }
82
83 @Override
84 public void onNext(final ByteBuffer byteBuffer) {
85 final byte[] copy = new byte[byteBuffer.remaining()];
86 byteBuffer.get(copy);
87 lock.lock();
88 try {
89 buffers.add(ByteBuffer.wrap(copy));
90 } finally {
91 lock.unlock();
92 }
93 signalReadiness();
94 }
95
96 @Override
97 public void onError(final Throwable throwable) {
98 subscription.set(null);
99 exception.set(throwable);
100 signalReadiness();
101 }
102
103 @Override
104 public void onComplete() {
105 subscription.set(null);
106 complete.set(true);
107 signalReadiness();
108 }
109
110 private void signalReadiness() {
111 final DataStreamChannel channel = requestChannel.get();
112 if (channel == null) {
113 throw new IllegalStateException("Output channel is not set");
114 }
115 channel.requestOutput();
116 }
117
118 @Override
119 public int available() {
120 if (exception.get() != null || complete.get()) {
121 return 1;
122 } else {
123 lock.lock();
124 try {
125 int sum = 0;
126 for (final ByteBuffer buffer : buffers) {
127 sum += buffer.remaining();
128 }
129 return sum;
130 } finally {
131 lock.unlock();
132 }
133 }
134 }
135
136 @Override
137 public void produce(final DataStreamChannel channel) throws IOException {
138 if (requestChannel.get() == null) {
139 requestChannel.set(channel);
140 publisher.subscribe(this);
141 }
142
143 final Throwable t = exception.get();
144 final Subscription s = subscription.get();
145 int buffersToReplenish = 0;
146 try {
147 lock.lock();
148 try {
149 if (t != null) {
150 throw new HttpStreamResetException(t.getMessage(), t);
151 } else if (this.complete.get() && buffers.isEmpty()) {
152 channel.endStream();
153 } else {
154 while (!buffers.isEmpty()) {
155 final ByteBuffer nextBuffer = buffers.remove();
156 channel.write(nextBuffer);
157 if (nextBuffer.remaining() > 0) {
158 buffers.push(nextBuffer);
159 break;
160 } else if (s != null) {
161
162 buffersToReplenish++;
163 }
164 }
165 }
166 } finally {
167 lock.unlock();
168 }
169 } finally {
170 if (s != null && buffersToReplenish > 0) {
171 s.request(buffersToReplenish);
172 }
173 }
174 }
175
176 @Override
177 public void releaseResources() {
178 final Subscription s = subscription.getAndSet(null);
179 if (s != null) {
180 s.cancel();
181 }
182 }
183 }