1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.concurrent.FutureCallback;
35 import org.apache.hc.core5.function.Supplier;
36 import org.apache.hc.core5.http.EntityDetails;
37 import org.apache.hc.core5.http.Header;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpRequest;
40 import org.apache.hc.core5.http.Message;
41 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
42 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
43 import org.apache.hc.core5.http.nio.CapacityChannel;
44 import org.apache.hc.core5.http.protocol.HttpContext;
45 import org.apache.hc.core5.util.Args;
46
47
48
49
50
51
52
53 public class BasicRequestConsumer<T> implements AsyncRequestConsumer<Message<HttpRequest, T>> {
54
55 private final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier;
56 private final AtomicReference<AsyncEntityConsumer<T>> dataConsumerRef;
57
58 public BasicRequestConsumer(final Supplier<AsyncEntityConsumer<T>> dataConsumerSupplier) {
59 this.dataConsumerSupplier = Args.notNull(dataConsumerSupplier, "Data consumer supplier");
60 this.dataConsumerRef = new AtomicReference<>(null);
61 }
62
63 public BasicRequestConsumer(final AsyncEntityConsumer<T> dataConsumer) {
64 this(new Supplier<AsyncEntityConsumer<T>>() {
65
66 @Override
67 public AsyncEntityConsumer<T> get() {
68 return dataConsumer;
69 }
70
71 });
72 }
73
74 @Override
75 public void consumeRequest(
76 final HttpRequest request,
77 final EntityDetails entityDetails,
78 final HttpContext httpContext,
79 final FutureCallback<Message<HttpRequest, T>> resultCallback) throws HttpException, IOException {
80 Args.notNull(request, "Request");
81 if (entityDetails != null) {
82 final AsyncEntityConsumer<T> dataConsumer = dataConsumerSupplier.get();
83 if (dataConsumer == null) {
84 throw new HttpException("Supplied data consumer is null");
85 }
86 dataConsumerRef.set(dataConsumer);
87
88 dataConsumer.streamStart(entityDetails, new FutureCallback<T>() {
89
90 @Override
91 public void completed(final T body) {
92 final Message<HttpRequest, T> result = new Message<>(request, body);
93 if (resultCallback != null) {
94 resultCallback.completed(result);
95 }
96 }
97
98 @Override
99 public void failed(final Exception ex) {
100 if (resultCallback != null) {
101 resultCallback.failed(ex);
102 }
103 }
104
105 @Override
106 public void cancelled() {
107 if (resultCallback != null) {
108 resultCallback.cancelled();
109 }
110 }
111
112 });
113 } else {
114 final Message<HttpRequest, T> result = new Message<>(request, null);
115 if (resultCallback != null) {
116 resultCallback.completed(result);
117 }
118 }
119 }
120
121 @Override
122 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
123 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
124 dataConsumer.updateCapacity(capacityChannel);
125 }
126
127 @Override
128 public void consume(final ByteBuffer src) throws IOException {
129 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
130 dataConsumer.consume(src);
131 }
132
133 @Override
134 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
135 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
136 dataConsumer.streamEnd(trailers);
137 }
138
139 @Override
140 public void failed(final Exception cause) {
141 releaseResources();
142 }
143
144 @Override
145 public void releaseResources() {
146 final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.getAndSet(null);
147 if (dataConsumer != null) {
148 dataConsumer.releaseResources();
149 }
150 }
151
152 }