1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.nio.support.classic;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.UnsupportedEncodingException;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.UnsupportedCharsetException;
34 import java.util.List;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.core5.concurrent.FutureCallback;
39 import org.apache.hc.core5.http.ContentType;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.Header;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
44 import org.apache.hc.core5.http.nio.CapacityChannel;
45 import org.apache.hc.core5.util.Args;
46
47
48
49
50
51
52
53
54
55
56 public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
57
58 private enum State { IDLE, ACTIVE, COMPLETED }
59
60 private final Executor executor;
61 private final SharedInputBuffer buffer;
62 private final AtomicReference<State> state;
63 private final AtomicReference<T> resultRef;
64 private final AtomicReference<Exception> exceptionRef;
65
66 public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
67 this.executor = Args.notNull(executor, "Executor");
68 this.buffer = new SharedInputBuffer(initialBufferSize);
69 this.state = new AtomicReference<>(State.IDLE);
70 this.resultRef = new AtomicReference<>(null);
71 this.exceptionRef = new AtomicReference<>(null);
72 }
73
74
75
76
77
78
79
80
81 protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
82
83 @Override
84 public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
85 buffer.updateCapacity(capacityChannel);
86 }
87
88 @Override
89 public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
90 final ContentType contentType;
91 try {
92 contentType = ContentType.parse(entityDetails.getContentType());
93 } catch (final UnsupportedCharsetException ex) {
94 throw new UnsupportedEncodingException(ex.getMessage());
95 }
96 if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
97 executor.execute(new Runnable() {
98
99 @Override
100 public void run() {
101 try {
102 final T result = consumeData(contentType, new ContentInputStream(buffer));
103 resultRef.set(result);
104 resultCallback.completed(result);
105 } catch (final Exception ex) {
106 buffer.abort();
107 resultCallback.failed(ex);
108 } finally {
109 state.set(State.COMPLETED);
110 }
111 }
112
113 });
114 }
115 }
116
117 @Override
118 public final void consume(final ByteBuffer src) throws IOException {
119 buffer.fill(src);
120 }
121
122 @Override
123 public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
124 buffer.markEndStream();
125 }
126
127 @Override
128 public final void failed(final Exception cause) {
129 if (exceptionRef.compareAndSet(null, cause)) {
130 releaseResources();
131 }
132 }
133
134 public final Exception getException() {
135 return exceptionRef.get();
136 }
137
138 @Override
139 public final T getContent() {
140 return resultRef.get();
141 }
142
143 @Override
144 public void releaseResources() {
145 }
146
147 }