1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.http.Header;
41 import org.apache.hc.core5.http.HttpStreamResetException;
42 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
43 import org.apache.hc.core5.http.nio.CapacityChannel;
44 import org.apache.hc.core5.util.Args;
45 import org.reactivestreams.Publisher;
46 import org.reactivestreams.Subscriber;
47 import org.reactivestreams.Subscription;
48
49
50
51
52
53
54 @Contract(threading = ThreadingBehavior.SAFE)
55 final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
56
57 private final AtomicLong requests = new AtomicLong(0);
58
59 private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
60 private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
61 private final Object flushLock = new Object();
62 private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
63 private volatile boolean cancelled;
64 private volatile boolean completed;
65 private volatile Exception exception;
66 private volatile CapacityChannel capacityChannel;
67 private volatile Subscriber<? super ByteBuffer> subscriber;
68
69 public void failed(final Exception cause) {
70 if (!completed) {
71 exception = cause;
72 flushToSubscriber();
73 }
74 }
75
76 @Override
77 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
78 throwIfCancelled();
79 this.capacityChannel = capacityChannel;
80 signalCapacity(capacityChannel);
81 }
82
83 private void signalCapacity(final CapacityChannel channel) throws IOException {
84 final int increment = windowScalingIncrement.getAndSet(0);
85 if (increment > 0) {
86 channel.update(increment);
87 }
88 }
89
90 private void throwIfCancelled() throws IOException {
91 if (cancelled) {
92 throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
93 }
94 }
95
96 @Override
97 public void consume(final ByteBuffer byteBuffer) throws IOException {
98 if (completed) {
99 throw new IllegalStateException("Received data past end of stream");
100 }
101 throwIfCancelled();
102
103 final byte[] copy = new byte[byteBuffer.remaining()];
104 byteBuffer.get(copy);
105 buffers.add(ByteBuffer.wrap(copy));
106
107 flushToSubscriber();
108 }
109
110 @Override
111 public void streamEnd(final List<? extends Header> trailers) {
112 completed = true;
113 flushToSubscriber();
114 }
115
116 @Override
117 public void releaseResources() {
118 this.capacityChannel = null;
119 }
120
121 private void flushToSubscriber() {
122 synchronized (flushLock) {
123 final Subscriber<? super ByteBuffer> s = subscriber;
124 if (flushInProgress.getAndSet(true)) {
125 return;
126 }
127 try {
128 if (s == null) {
129 return;
130 }
131 if (exception != null) {
132 subscriber = null;
133 s.onError(exception);
134 return;
135 }
136 ByteBuffer next;
137 while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
138 final int bytesFreed = next.remaining();
139 s.onNext(next);
140 requests.decrementAndGet();
141 windowScalingIncrement.addAndGet(bytesFreed);
142 }
143 final CapacityChannel localChannel = capacityChannel;
144 if (localChannel != null) {
145 try {
146 signalCapacity(localChannel);
147 } catch (final IOException e) {
148 exception = e;
149 s.onError(e);
150 return;
151 }
152 }
153 if (completed && buffers.isEmpty()) {
154 subscriber = null;
155 s.onComplete();
156 }
157 } finally {
158 flushInProgress.set(false);
159 }
160 }
161 }
162
163 @Override
164 public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
165 this.subscriber = Args.notNull(subscriber, "subscriber");
166 subscriber.onSubscribe(new Subscription() {
167 @Override
168 public void request(final long increment) {
169 if (increment <= 0) {
170 failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
171 return;
172 }
173 requests.addAndGet(increment);
174 flushToSubscriber();
175 }
176
177 @Override
178 public void cancel() {
179 ReactiveDataConsumer.this.cancelled = true;
180 ReactiveDataConsumer.this.subscriber = null;
181 }
182 });
183 }
184 }