1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.UnsupportedCharsetException;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.core5.concurrent.CallbackContribution;
36 import org.apache.hc.core5.concurrent.FutureCallback;
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.EntityDetails;
40 import org.apache.hc.core5.http.Header;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpRequest;
43 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
44 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
45 import org.apache.hc.core5.http.nio.CapacityChannel;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.util.Args;
48
49
50
51
52
53
54
55
56
57
58 public abstract class AbstractAsyncRequesterConsumer<T, E> implements AsyncRequestConsumer<T> {
59
60 private final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier;
61 private final AtomicReference<AsyncEntityConsumer<E>> dataConsumerRef;
62
63 public AbstractAsyncRequesterConsumer(final Supplier<AsyncEntityConsumer<E>> dataConsumerSupplier) {
64 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
65 this.dataConsumerRef = new AtomicReference<>();
66 }
67
68 public AbstractAsyncRequesterConsumer(final AsyncEntityConsumer<E> dataConsumer) {
69 this(() -> dataConsumer);
70 }
71
72
73
74
75
76
77
78
79 protected abstract T buildResult(HttpRequest request, E entity, ContentType contentType);
80
81 @Override
82 public final void consumeRequest(
83 final HttpRequest request,
84 final EntityDetails entityDetails,
85 final HttpContext httpContext, final FutureCallback<T> resultCallback) throws HttpException, IOException {
86 if (entityDetails != null) {
87 final AsyncEntityConsumer<E> dataConsumer = dataConsumerSupplier.get();
88 if (dataConsumer == null) {
89 throw new HttpException("Supplied data consumer is null");
90 }
91 dataConsumerRef.set(dataConsumer);
92 dataConsumer.streamStart(entityDetails, new CallbackContribution<E>(resultCallback) {
93
94 @Override
95 public void completed(final E entity) {
96 final ContentType contentType;
97 try {
98 contentType = ContentType.parse(entityDetails.getContentType());
99 final T result = buildResult(request, entity, contentType);
100 resultCallback.completed(result);
101 } catch (final UnsupportedCharsetException ex) {
102 resultCallback.failed(ex);
103 }
104 }
105
106 });
107 } else {
108 resultCallback.completed(buildResult(request, null, null));
109 }
110
111 }
112
113 @Override
114 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
115 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
116 dataConsumer.updateCapacity(capacityChannel);
117 }
118
119 @Override
120 public final void consume(final ByteBuffer src) throws IOException {
121 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
122 dataConsumer.consume(src);
123 }
124
125 @Override
126 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
127 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
128 dataConsumer.streamEnd(trailers);
129 }
130
131 @Override
132 public final void failed(final Exception cause) {
133 releaseResources();
134 }
135
136 @Override
137 public final void releaseResources() {
138 final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.getAndSet(null);
139 if (dataConsumer != null) {
140 dataConsumer.releaseResources();
141 }
142 }
143
144 }