1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.UnsupportedCharsetException;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.concurrent.FutureCallback;
36 import org.apache.hc.core5.function.Supplier;
37 import org.apache.hc.core5.http.ContentType;
38 import org.apache.hc.core5.http.EntityDetails;
39 import org.apache.hc.core5.http.Header;
40 import org.apache.hc.core5.http.HttpException;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
43 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.http.protocol.HttpContext;
46 import org.apache.hc.core5.util.Args;
47
48
49
50
51
52
53
54
55
56
57 public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
58
59 private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
60 private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
61
62 public AbstractAsyncResponseConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
63 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
64 this.dataConsumerRef = new AtomicReference<>(null);
65 }
66
67 public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> dataConsumer) {
68 this(new Supplier<AsyncEntityConsumer<E>>() {
69
70 @Override
71 public AsyncEntityConsumer<E> get() {
72 return dataConsumer;
73 }
74
75 });
76 }
77
78
79
80
81
82
83
84
85 protected abstract T buildResult(HttpResponse response, E entity, ContentType contentType);
86
87 @Override
88 public final void consumeResponse(
89 final HttpResponse response,
90 final EntityDetails entityDetails,
91 final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
92 if (entityDetails != null) {
93 final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
94 if (dataConsumer == null) {
95 throw new HttpException("Supplied data consumer is null");
96 }
97 dataConsumerRef.set(dataConsumer);
98 dataConsumer.streamStart(entityDetails, new FutureCallback<E>() {
99
100 @Override
101 public void completed(final E entity) {
102 final ContentType contentType;
103 try {
104 contentType = ContentType.parse(entityDetails.getContentType());
105 final T result = buildResult(response, entity, contentType);
106 if (resultCallback != null) {
107 resultCallback.completed(result);
108 }
109 } catch (final UnsupportedCharsetException ex) {
110 if (resultCallback != null) {
111 resultCallback.failed(ex);
112 }
113 }
114 }
115
116 @Override
117 public void failed(final Exception ex) {
118 if (resultCallback != null) {
119 resultCallback.failed(ex);
120 }
121 }
122
123 @Override
124 public void cancelled() {
125 if (resultCallback != null) {
126 resultCallback.cancelled();
127 }
128 }
129
130 });
131 } else {
132 final T result = buildResult(response, null, null);
133 if (resultCallback != null) {
134 resultCallback.completed(result);
135 }
136 }
137
138 }
139
140 @Override
141 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
142 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
143 if (dataConsumer != null) {
144 dataConsumer.updateCapacity(capacityChannel);
145 } else {
146 capacityChannel.update(Integer.MAX_VALUE);
147 }
148 }
149
150 @Override
151 public final void consume(final ByteBuffer src) throws IOException {
152 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
153 if (dataConsumer != null) {
154 dataConsumer.consume(src);
155 }
156 }
157
158 @Override
159 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
160 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
161 if (dataConsumer != null) {
162 dataConsumer.streamEnd(trailers);
163 }
164 }
165
166 @Override
167 public final void failed(final Exception cause) {
168 releaseResources();
169 }
170
171 @Override
172 public final void releaseResources() {
173 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
174 if (dataConsumer != null) {
175 dataConsumer.releaseResources();
176 }
177 }
178
179 }