1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.UnsupportedEncodingException;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.UnsupportedCharsetException;
34 import java.util.List;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.core5.concurrent.FutureCallback;
39 import org.apache.hc.core5.http.ContentType;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.util.Args;
46
47
48
49
50
51
52
53
54
55
56 public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
57
58 private enum State { IDLE, ACTIVE, COMPLETED }
59
60 private final Executor executor;
61 private final SharedInputBuffer buffer;
62 private final AtomicReference<State> state;
63 private final AtomicReference<T> resultRef;
64 private final AtomicReference<Exception> exceptionRef;
65
66 public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
67 this.executor = Args.notNull(executor, "Executor");
68 this.buffer = new SharedInputBuffer(initialBufferSize);
69 this.state = new AtomicReference<>(State.IDLE);
70 this.resultRef = new AtomicReference<>();
71 this.exceptionRef = new AtomicReference<>();
72 }
73
74
75
76
77
78
79
80
81 protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
82
83 @Override
84 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
85 buffer.updateCapacity(capacityChannel);
86 }
87
88 @Override
89 public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
90 final ContentType contentType;
91 try {
92 contentType = ContentType.parse(entityDetails.getContentType());
93 } catch (final UnsupportedCharsetException ex) {
94 throw new UnsupportedEncodingException(ex.getMessage());
95 }
96 if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
97 executor.execute(() -> {
98 try {
99 final T result = consumeData(contentType, new ContentInputStream(buffer));
100 resultRef.set(result);
101 resultCallback.completed(result);
102 } catch (final Exception ex) {
103 buffer.abort();
104 resultCallback.failed(ex);
105 } finally {
106 state.set(State.COMPLETED);
107 }
108 });
109 }
110 }
111
112 @Override
113 public final void consume(final ByteBuffer src) throws IOException {
114 buffer.fill(src);
115 }
116
117 @Override
118 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
119 buffer.markEndStream();
120 }
121
122 @Override
123 public final void failed(final Exception cause) {
124 if (exceptionRef.compareAndSet(null, cause)) {
125 releaseResources();
126 }
127 }
128
129 public final Exception getException() {
130 return exceptionRef.get();
131 }
132
133 @Override
134 public final T getContent() {
135 return resultRef.get();
136 }
137
138 @Override
139 public void releaseResources() {
140 }
141
142 }