1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.concurrent.FutureCallback;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpResponse;
39 import org.apache.hc.core5.http.HttpStatus;
40 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
41 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
42 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
43 import org.apache.hc.core5.http.nio.CapacityChannel;
44 import org.apache.hc.core5.http.nio.DataStreamChannel;
45 import org.apache.hc.core5.http.nio.RequestChannel;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.util.Args;
48
49
50
51
52
53
54
55
56 public final class BasicClientExchangeHandler<T> implements AsyncClientExchangeHandler {
57
58 private final AsyncRequestProducer requestProducer;
59 private final AsyncResponseConsumer<T> responseConsumer;
60 private final AtomicBoolean completed;
61 private final FutureCallback<T> resultCallback;
62 private final AtomicBoolean outputTerminated;
63
64 public BasicClientExchangeHandler(
65 final AsyncRequestProducer requestProducer,
66 final AsyncResponseConsumer<T> responseConsumer,
67 final FutureCallback<T> resultCallback) {
68 this.requestProducer = Args.notNull(requestProducer, "Request producer");
69 this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
70 this.completed = new AtomicBoolean(false);
71 this.resultCallback = resultCallback;
72 this.outputTerminated = new AtomicBoolean(false);
73 }
74
75 @Override
76 public void produceRequest(final RequestChannel requestChannel, final HttpContext httpContext) throws HttpException, IOException {
77 requestProducer.sendRequest(requestChannel, httpContext);
78 }
79
80 @Override
81 public int available() {
82 return requestProducer.available();
83 }
84
85 @Override
86 public void produce(final DataStreamChannel channel) throws IOException {
87 if (outputTerminated.get()) {
88 channel.endStream();
89 return;
90 }
91 requestProducer.produce(channel);
92 }
93
94 @Override
95 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
96 responseConsumer.informationResponse(response, httpContext);
97 }
98
99 @Override
100 public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
101 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
102 outputTerminated.set(true);
103 requestProducer.releaseResources();
104 }
105 responseConsumer.consumeResponse(response, entityDetails, httpContext, new FutureCallback<T>() {
106
107 @Override
108 public void completed(final T result) {
109 if (completed.compareAndSet(false, true)) {
110 try {
111 if (resultCallback != null) {
112 resultCallback.completed(result);
113 }
114 } finally {
115 internalReleaseResources();
116 }
117 }
118 }
119
120 @Override
121 public void failed(final Exception ex) {
122 if (completed.compareAndSet(false, true)) {
123 try {
124 if (resultCallback != null) {
125 resultCallback.failed(ex);
126 }
127 } finally {
128 internalReleaseResources();
129 }
130 }
131 }
132
133 @Override
134 public void cancelled() {
135 if (completed.compareAndSet(false, true)) {
136 try {
137 if (resultCallback != null) {
138 resultCallback.cancelled();
139 }
140 } finally {
141 internalReleaseResources();
142 }
143 }
144 }
145
146 });
147 }
148
149 @Override
150 public void cancel() {
151 if (completed.compareAndSet(false, true)) {
152 try {
153 if (resultCallback != null) {
154 resultCallback.cancelled();
155 }
156 } finally {
157 internalReleaseResources();
158 }
159 }
160 }
161
162 @Override
163 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
164 responseConsumer.updateCapacity(capacityChannel);
165 }
166
167 @Override
168 public void consume(final ByteBuffer src) throws IOException {
169 responseConsumer.consume(src);
170 }
171
172 @Override
173 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
174 responseConsumer.streamEnd(trailers);
175 }
176
177 @Override
178 public void failed(final Exception cause) {
179 try {
180 requestProducer.failed(cause);
181 responseConsumer.failed(cause);
182 } finally {
183 if (completed.compareAndSet(false, true)) {
184 try {
185 if (resultCallback != null) {
186 resultCallback.failed(cause);
187 }
188 } finally {
189 internalReleaseResources();
190 }
191 }
192 }
193 }
194
195 private void internalReleaseResources() {
196 requestProducer.releaseResources();
197 responseConsumer.releaseResources();
198 }
199
200 @Override
201 public void releaseResources() {
202 }
203
204 }