1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Callback;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpRequest;
39 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
40 import org.apache.hc.core5.http.nio.CapacityChannel;
41 import org.apache.hc.core5.http.nio.DataStreamChannel;
42 import org.apache.hc.core5.http.nio.ResponseChannel;
43 import org.apache.hc.core5.http.protocol.HttpContext;
44 import org.reactivestreams.Publisher;
45
46
47
48
49
50
51 public final class ReactiveServerExchangeHandler implements AsyncServerExchangeHandler {
52
53 private final ReactiveRequestProcessor requestProcessor;
54 private final AtomicReference<ReactiveDataProducer> responseProducer = new AtomicReference<>();
55 private final ReactiveDataConsumer requestConsumer;
56 private volatile DataStreamChannel channel;
57
58
59
60
61
62
63
64 public ReactiveServerExchangeHandler(final ReactiveRequestProcessor requestProcessor) {
65 this.requestProcessor = requestProcessor;
66 this.requestConsumer = new ReactiveDataConsumer();
67 }
68
69 @Override
70 public void handleRequest(
71 final HttpRequest request,
72 final EntityDetails entityDetails,
73 final ResponseChannel responseChannel,
74 final HttpContext context
75 ) throws HttpException, IOException {
76 final Callback<Publisher<ByteBuffer>> callback = result -> {
77 final ReactiveDataProducer producer = new ReactiveDataProducer(result);
78 if (channel != null) {
79 producer.setChannel(channel);
80 }
81 responseProducer.set(producer);
82 result.subscribe(producer);
83 };
84 requestProcessor.processRequest(request, entityDetails, responseChannel, context, requestConsumer, callback);
85 }
86
87 @Override
88 public void failed(final Exception cause) {
89 requestConsumer.failed(cause);
90 final ReactiveDataProducer p = responseProducer.get();
91 if (p != null) {
92 p.onError(cause);
93 }
94 }
95
96 @Override
97 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
98 requestConsumer.updateCapacity(capacityChannel);
99 }
100
101 @Override
102 public void consume(final ByteBuffer src) throws IOException {
103 requestConsumer.consume(src);
104 }
105
106 @Override
107 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
108 requestConsumer.streamEnd(trailers);
109 }
110
111 @Override
112 public int available() {
113 final ReactiveDataProducer p = responseProducer.get();
114 if (p == null) {
115 return 0;
116 } else {
117 return p.available();
118 }
119 }
120
121 @Override
122 public void produce(final DataStreamChannel channel) throws IOException {
123 this.channel = channel;
124 final ReactiveDataProducer p = responseProducer.get();
125 if (p != null) {
126 p.produce(channel);
127 }
128 }
129
130 @Override
131 public void releaseResources() {
132 final ReactiveDataProducer p = responseProducer.get();
133 if (p != null) {
134 p.releaseResources();
135 }
136 requestConsumer.releaseResources();
137 }
138 }