1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.concurrent.CallbackContribution;
35 import org.apache.hc.core5.concurrent.FutureCallback;
36 import org.apache.hc.core5.function.Supplier;
37 import org.apache.hc.core5.http.EntityDetails;
38 import org.apache.hc.core5.http.Header;
39 import org.apache.hc.core5.http.HttpException;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.Message;
42 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
43 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.http.protocol.HttpContext;
46 import org.apache.hc.core5.util.Args;
47
48
49
50
51
52
53
54
55 public class BasicResponseConsumer<T> implements AsyncResponseConsumer<Message<HttpResponse, T>> {
56
57 private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
58 private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
59
60 public BasicResponseConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
61 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
62 this.dataConsumerRef = new AtomicReference<>();
63 }
64
65 public BasicResponseConsumer(final AsyncEntityConsumer<T> dataConsumer) {
66 this(new Supplier<AsyncEntityConsumer<T>>() {
67
68 @Override
69 public AsyncEntityConsumer<T> get() {
70 return dataConsumer;
71 }
72
73 });
74 }
75
76 @Override
77 public void consumeResponse(
78 final HttpResponse response,
79 final EntityDetails entityDetails,
80 final HttpContext httpContext, final FutureCallback<Message<HttpResponse, T>> resultCallback) throws HttpException, IOException {
81 Args.notNull(response, "Response");
82
83 if (entityDetails != null) {
84 final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
85 if (dataConsumer == null) {
86 throw new HttpException("Supplied data consumer is null");
87 }
88 dataConsumerRef.set(dataConsumer);
89 dataConsumer.streamStart(entityDetails, new CallbackContribution<T>(resultCallback) {
90
91 @Override
92 public void completed(final T body) {
93 final Message<HttpResponse, T> result = new Message<>(response, body);
94 if (resultCallback != null) {
95 resultCallback.completed(result);
96 }
97 }
98
99 });
100 } else {
101 final Message<HttpResponse, T> result = new Message<>(response, null);
102 if (resultCallback != null) {
103 resultCallback.completed(result);
104 }
105 }
106 }
107
108 @Override
109 public void informationResponse(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
110 }
111
112 @Override
113 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
114 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
115 dataConsumer.updateCapacity(capacityChannel);
116 }
117
118 @Override
119 public void consume(final ByteBuffer src) throws IOException {
120 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
121 dataConsumer.consume(src);
122 }
123
124 @Override
125 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
126 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
127 dataConsumer.streamEnd(trailers);
128 }
129
130 @Override
131 public void failed(final Exception cause) {
132 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
133 if (dataConsumer != null) {
134 dataConsumer.failed(cause);
135 }
136 releaseResources();
137 }
138
139 @Override
140 public void releaseResources() {
141 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
142 if (dataConsumer != null) {
143 dataConsumer.releaseResources();
144 }
145 }
146
147 }