1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Callback;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpRequest;
39 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
40 import org.apache.hc.core5.http.nio.CapacityChannel;
41 import org.apache.hc.core5.http.nio.DataStreamChannel;
42 import org.apache.hc.core5.http.nio.ResponseChannel;
43 import org.apache.hc.core5.http.protocol.HttpContext;
44 import org.reactivestreams.Publisher;
45
46
47
48
49
50
51 public final class ReactiveServerExchangeHandler implements AsyncServerExchangeHandler {
52
53 private final ReactiveRequestProcessor requestProcessor;
54 private final AtomicReference<ReactiveDataProducer> responseProducer = new AtomicReference<>(null);
55 private final ReactiveDataConsumer requestConsumer;
56 private volatile DataStreamChannel channel;
57
58
59
60
61
62
63
64 public ReactiveServerExchangeHandler(final ReactiveRequestProcessor requestProcessor) {
65 this.requestProcessor = requestProcessor;
66 this.requestConsumer = new ReactiveDataConsumer();
67 }
68
69 @Override
70 public void handleRequest(
71 final HttpRequest request,
72 final EntityDetails entityDetails,
73 final ResponseChannel responseChannel,
74 final HttpContext context
75 ) throws HttpException, IOException {
76 final Callback<Publisher<ByteBuffer>> callback = new Callback<Publisher<ByteBuffer>>() {
77 @Override
78 public void execute(final Publisher<ByteBuffer> result) {
79 final ReactiveDataProducerProducer.html#ReactiveDataProducer">ReactiveDataProducer producer = new ReactiveDataProducer(result);
80 if (channel != null) {
81 producer.setChannel(channel);
82 }
83 responseProducer.set(producer);
84 result.subscribe(producer);
85 }
86 };
87 requestProcessor.processRequest(request, entityDetails, responseChannel, context, requestConsumer, callback);
88 }
89
90 @Override
91 public void failed(final Exception cause) {
92 requestConsumer.failed(cause);
93 final ReactiveDataProducer p = responseProducer.get();
94 if (p != null) {
95 p.onError(cause);
96 }
97 }
98
99 @Override
100 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
101 requestConsumer.updateCapacity(capacityChannel);
102 }
103
104 @Override
105 public void consume(final ByteBuffer src) throws IOException {
106 requestConsumer.consume(src);
107 }
108
109 @Override
110 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
111 requestConsumer.streamEnd(trailers);
112 }
113
114 @Override
115 public int available() {
116 final ReactiveDataProducer p = responseProducer.get();
117 if (p == null) {
118 return 0;
119 } else {
120 return p.available();
121 }
122 }
123
124 @Override
125 public void produce(final DataStreamChannel channel) throws IOException {
126 this.channel = channel;
127 final ReactiveDataProducer p = responseProducer.get();
128 if (p != null) {
129 p.produce(channel);
130 }
131 }
132
133 @Override
134 public void releaseResources() {
135 final ReactiveDataProducer p = responseProducer.get();
136 if (p != null) {
137 p.releaseResources();
138 }
139 requestConsumer.releaseResources();
140 }
141 }