1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.benchmark;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.Charset;
32 import java.nio.charset.CharsetDecoder;
33 import java.nio.charset.StandardCharsets;
34 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.core5.concurrent.FutureCallback;
40 import org.apache.hc.core5.http.ContentType;
41 import org.apache.hc.core5.http.EntityDetails;
42 import org.apache.hc.core5.http.Header;
43 import org.apache.hc.core5.http.HeaderElements;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpHeaders;
46 import org.apache.hc.core5.http.HttpHost;
47 import org.apache.hc.core5.http.HttpResponse;
48 import org.apache.hc.core5.http.HttpStatus;
49 import org.apache.hc.core5.http.Method;
50 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
51 import org.apache.hc.core5.http.message.BasicHeader;
52 import org.apache.hc.core5.http.message.BasicHttpRequest;
53 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
55 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
56 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
57 import org.apache.hc.core5.http.nio.CapacityChannel;
58 import org.apache.hc.core5.http.nio.DataStreamChannel;
59 import org.apache.hc.core5.http.nio.RequestChannel;
60 import org.apache.hc.core5.http.nio.ResourceHolder;
61 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
62 import org.apache.hc.core5.http.nio.entity.FileEntityProducer;
63 import org.apache.hc.core5.http.protocol.HttpContext;
64 import org.apache.hc.core5.http.protocol.HttpCoreContext;
65
66 class BenchmarkWorker implements ResourceHolder {
67
68 private final HttpAsyncRequester requester;
69 private final HttpHost host;
70 private final HttpCoreContext context;
71 private final AtomicLong requestCount;
72 private final CountDownLatch completionLatch;
73 private final Stats stats;
74 private final BenchmarkConfig config;
75 private final AtomicReference<AsyncClientEndpoint> endpointRef;
76
77 public BenchmarkWorker(
78 final HttpAsyncRequester requester,
79 final HttpHost host,
80 final HttpCoreContext context,
81 final AtomicLong requestCount,
82 final CountDownLatch completionLatch,
83 final Stats stats,
84 final BenchmarkConfig config) {
85 this.requester = requester;
86 this.host = host;
87 this.context = context;
88 this.requestCount = requestCount;
89 this.completionLatch = completionLatch;
90 this.stats = stats;
91 this.config = config;
92 this.endpointRef = new AtomicReference<>(null);
93 }
94
95 private AsyncRequestProducer createRequestProducer() {
96 String method = config.getMethod();
97 if (method == null) {
98 method = config.isHeadInsteadOfGet() ? Method.HEAD.name() : Method.GET.name();
99 }
100
101 final BasicHttpRequesticHttpRequest.html#BasicHttpRequest">BasicHttpRequest request = new BasicHttpRequest(method, config.getUri());
102 final String[] headers = config.getHeaders();
103 if (headers != null) {
104 for (final String s : headers) {
105 final int pos = s.indexOf(':');
106 if (pos != -1) {
107 request.addHeader(new BasicHeader(s.substring(0, pos).trim(), s.substring(pos + 1)));
108 }
109 }
110 }
111 if (!config.isKeepAlive() && !config.isForceHttp2()) {
112 request.addHeader(new BasicHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE));
113 }
114 if (config.isUseAcceptGZip()) {
115 request.addHeader(new BasicHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"));
116 }
117 if (config.getSoapAction() != null && config.getSoapAction().length() > 0) {
118 request.addHeader(new BasicHeader("SOAPAction", config.getSoapAction()));
119 }
120
121 final AsyncEntityProducer entityProducer;
122 if (config.getPayloadFile() != null) {
123 entityProducer = new FileEntityProducer(
124 config.getPayloadFile(),
125 config.getContentType(),
126 config.isUseChunking());
127 } else if (config.getPayloadText() != null) {
128 entityProducer = new BasicAsyncEntityProducer(
129 config.getPayloadText(),
130 config.getContentType(),
131 config.isUseChunking());
132 } else {
133 entityProducer = null;
134 }
135
136 return new AsyncRequestProducer() {
137
138 @Override
139 public void sendRequest(
140 final RequestChannel channel,
141 final HttpContext context) throws HttpException, IOException {
142 channel.sendRequest(request, entityProducer, context);
143 }
144
145 @Override
146 public boolean isRepeatable() {
147 return entityProducer == null || entityProducer.isRepeatable();
148 }
149
150 @Override
151 public int available() {
152 return entityProducer != null ? entityProducer.available() : 0;
153 }
154
155 @Override
156 public void produce(final DataStreamChannel channel) throws IOException {
157 if (entityProducer != null) {
158 entityProducer.produce(channel);
159 }
160 }
161
162 @Override
163 public void failed(final Exception cause) {
164 if (config.getVerbosity() >= 1) {
165 System.out.println("Failed HTTP request: " + cause.getMessage());
166 }
167 }
168
169 @Override
170 public void releaseResources() {
171 if (entityProducer != null) {
172 entityProducer.releaseResources();
173 }
174 }
175
176 };
177 }
178
179 private AsyncResponseConsumer<Void> createResponseConsumer() {
180
181 return new AsyncResponseConsumer<Void>() {
182
183 volatile int status;
184 volatile Charset charset;
185 final AtomicLong contentLength = new AtomicLong();
186 final AtomicReference<FutureCallback<Void>> resultCallbackRef = new AtomicReference<>(null);
187
188 @Override
189 public void consumeResponse(
190 final HttpResponse response,
191 final EntityDetails entityDetails,
192 final HttpContext context,
193 final FutureCallback<Void> resultCallback) throws HttpException, IOException {
194 status = response.getCode();
195 resultCallbackRef.set(resultCallback);
196 stats.setVersion(response.getVersion());
197 final Header serverHeader = response.getFirstHeader(HttpHeaders.SERVER);
198 if (serverHeader != null) {
199 stats.setServerName(serverHeader.getValue());
200 }
201 if (config.getVerbosity() >= 2) {
202 System.out.println(response.getCode());
203 }
204 if (entityDetails != null) {
205 if (config.getVerbosity() >= 6) {
206 if (entityDetails.getContentType() != null) {
207 final ContentType contentType = ContentType.parseLenient(entityDetails.getContentType());
208 charset = contentType.getCharset();
209 }
210 }
211 } else {
212 streamEnd(null);
213 }
214 }
215
216 @Override
217 public void informationResponse(
218 final HttpResponse response,
219 final HttpContext context) throws HttpException, IOException {
220 }
221
222 @Override
223 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
224 capacityChannel.update(Integer.MAX_VALUE);
225 }
226
227 @Override
228 public void consume(final ByteBuffer src) throws IOException {
229 final int n = src.remaining();
230 contentLength.addAndGet(n);
231 stats.incTotalContentLength(n);
232 if (config.getVerbosity() >= 6) {
233 final CharsetDecoder decoder = (charset != null ? charset : StandardCharsets.US_ASCII).newDecoder();
234 System.out.print(decoder.decode(src));
235 }
236 }
237
238 @Override
239 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
240 if (status == HttpStatus.SC_OK) {
241 stats.incSuccessCount();
242 } else {
243 stats.incFailureCount();
244 }
245 stats.setContentLength(contentLength.get());
246 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
247 if (resultCallback != null) {
248 resultCallback.completed(null);
249 }
250 if (config.getVerbosity() >= 6) {
251 System.out.println();
252 System.out.println();
253 }
254 }
255
256 @Override
257 public void failed(final Exception cause) {
258 stats.incFailureCount();
259 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
260 if (resultCallback != null) {
261 resultCallback.failed(cause);
262 }
263 if (config.getVerbosity() >= 1) {
264 System.out.println("HTTP response error: " + cause.getMessage());
265 }
266 }
267
268 @Override
269 public void releaseResources() {
270 }
271
272 };
273 }
274
275 public void execute() {
276 if (requestCount.decrementAndGet() >= 0) {
277 AsyncClientEndpoint endpoint = endpointRef.get();
278 if (endpoint != null && !endpoint.isConnected()) {
279 endpoint.releaseAndDiscard();
280 endpoint = null;
281 }
282 if (endpoint == null) {
283 requester.connect(host, config.getSocketTimeout(), null, new FutureCallback<AsyncClientEndpoint>() {
284
285 @Override
286 public void completed(final AsyncClientEndpoint endpoint) {
287 endpointRef.set(endpoint);
288 endpoint.execute(
289 createRequestProducer(),
290 createResponseConsumer(),
291 context,
292 new FutureCallback<Void>() {
293
294 @Override
295 public void completed(final Void result) {
296 execute();
297 }
298
299 @Override
300 public void failed(final Exception cause) {
301 execute();
302 }
303
304 @Override
305 public void cancelled() {
306 completionLatch.countDown();
307 }
308
309 });
310 }
311
312 @Override
313 public void failed(final Exception cause) {
314 stats.incFailureCount();
315 if (config.getVerbosity() >= 1) {
316 System.out.println("Connect error: " + cause.getMessage());
317 }
318 execute();
319 }
320
321 @Override
322 public void cancelled() {
323 completionLatch.countDown();
324 }
325
326 });
327 } else {
328 stats.incKeepAliveCount();
329 endpoint.execute(
330 createRequestProducer(),
331 createResponseConsumer(),
332 context,
333 new FutureCallback<Void>() {
334
335 @Override
336 public void completed(final Void result) {
337 execute();
338 }
339
340 @Override
341 public void failed(final Exception cause) {
342 execute();
343 }
344
345 @Override
346 public void cancelled() {
347 completionLatch.countDown();
348 }
349
350 });
351 }
352 } else {
353 completionLatch.countDown();
354 }
355 }
356
357 @Override
358 public void releaseResources() {
359 final AsyncClientEndpoint endpoint = endpointRef.getAndSet(null);
360 if (endpoint != null) {
361 endpoint.releaseAndDiscard();
362 }
363 }
364
365 }