View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.benchmark;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.nio.charset.Charset;
32  import java.nio.charset.CharsetDecoder;
33  import java.nio.charset.StandardCharsets;
34  import java.util.List;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.atomic.AtomicLong;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import org.apache.hc.core5.concurrent.FutureCallback;
40  import org.apache.hc.core5.http.ContentType;
41  import org.apache.hc.core5.http.EntityDetails;
42  import org.apache.hc.core5.http.Header;
43  import org.apache.hc.core5.http.HeaderElements;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpHeaders;
46  import org.apache.hc.core5.http.HttpHost;
47  import org.apache.hc.core5.http.HttpResponse;
48  import org.apache.hc.core5.http.HttpStatus;
49  import org.apache.hc.core5.http.Method;
50  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
51  import org.apache.hc.core5.http.message.BasicHeader;
52  import org.apache.hc.core5.http.message.BasicHttpRequest;
53  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
54  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
55  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
56  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
57  import org.apache.hc.core5.http.nio.CapacityChannel;
58  import org.apache.hc.core5.http.nio.DataStreamChannel;
59  import org.apache.hc.core5.http.nio.RequestChannel;
60  import org.apache.hc.core5.http.nio.ResourceHolder;
61  import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
62  import org.apache.hc.core5.http.nio.entity.FileEntityProducer;
63  import org.apache.hc.core5.http.protocol.HttpContext;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  
66  class BenchmarkWorker implements ResourceHolder {
67  
68      private final HttpAsyncRequester requester;
69      private final HttpHost host;
70      private final HttpCoreContext context;
71      private final AtomicLong requestCount;
72      private final CountDownLatch completionLatch;
73      private final Stats stats;
74      private final BenchmarkConfig config;
75      private final AtomicReference<AsyncClientEndpoint> endpointRef;
76  
77      public BenchmarkWorker(
78              final HttpAsyncRequester requester,
79              final HttpHost host,
80              final HttpCoreContext context,
81              final AtomicLong requestCount,
82              final CountDownLatch completionLatch,
83              final Stats stats,
84              final BenchmarkConfig config) {
85          this.requester = requester;
86          this.host = host;
87          this.context = context;
88          this.requestCount = requestCount;
89          this.completionLatch = completionLatch;
90          this.stats = stats;
91          this.config = config;
92          this.endpointRef = new AtomicReference<>(null);
93      }
94  
95      private AsyncRequestProducer createRequestProducer() {
96          String method = config.getMethod();
97          if (method == null) {
98              method = config.isHeadInsteadOfGet() ? Method.HEAD.name() : Method.GET.name();
99          }
100 
101         final BasicHttpRequesticHttpRequest.html#BasicHttpRequest">BasicHttpRequest request = new BasicHttpRequest(method, config.getUri());
102         final String[] headers = config.getHeaders();
103         if (headers != null) {
104             for (final String s : headers) {
105                 final int pos = s.indexOf(':');
106                 if (pos != -1) {
107                     request.addHeader(new BasicHeader(s.substring(0, pos).trim(), s.substring(pos + 1)));
108                 }
109             }
110         }
111         if (!config.isKeepAlive() && !config.isForceHttp2()) {
112             request.addHeader(new BasicHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE));
113         }
114         if (config.isUseAcceptGZip()) {
115             request.addHeader(new BasicHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"));
116         }
117         if (config.getSoapAction() != null && config.getSoapAction().length() > 0) {
118             request.addHeader(new BasicHeader("SOAPAction", config.getSoapAction()));
119         }
120 
121         final AsyncEntityProducer entityProducer;
122         if (config.getPayloadFile() != null) {
123             entityProducer = new FileEntityProducer(
124                     config.getPayloadFile(),
125                     config.getContentType(),
126                     config.isUseChunking());
127         } else if (config.getPayloadText() != null) {
128             entityProducer = new BasicAsyncEntityProducer(
129                     config.getPayloadText(),
130                     config.getContentType(),
131                     config.isUseChunking());
132         } else {
133             entityProducer = null;
134         }
135 
136         return new AsyncRequestProducer() {
137 
138             @Override
139             public void sendRequest(
140                     final RequestChannel channel,
141                     final HttpContext context) throws HttpException, IOException {
142                 channel.sendRequest(request, entityProducer, context);
143             }
144 
145             @Override
146             public boolean isRepeatable() {
147                 return entityProducer == null || entityProducer.isRepeatable();
148             }
149 
150             @Override
151             public int available() {
152                 return entityProducer != null ? entityProducer.available() : 0;
153             }
154 
155             @Override
156             public void produce(final DataStreamChannel channel) throws IOException {
157                 if (entityProducer != null) {
158                     entityProducer.produce(channel);
159                 }
160             }
161 
162             @Override
163             public void failed(final Exception cause) {
164                 if (config.getVerbosity() >= 1) {
165                     System.out.println("Failed HTTP request: " + cause.getMessage());
166                 }
167             }
168 
169             @Override
170             public void releaseResources() {
171                 if (entityProducer != null) {
172                     entityProducer.releaseResources();
173                 }
174             }
175 
176         };
177     }
178 
179     private AsyncResponseConsumer<Void> createResponseConsumer() {
180 
181         return new AsyncResponseConsumer<Void>() {
182 
183             volatile int status;
184             volatile Charset charset;
185             final AtomicLong contentLength = new AtomicLong();
186             final AtomicReference<FutureCallback<Void>> resultCallbackRef = new AtomicReference<>(null);
187 
188             @Override
189             public void consumeResponse(
190                     final HttpResponse response,
191                     final EntityDetails entityDetails,
192                     final HttpContext context,
193                     final FutureCallback<Void> resultCallback) throws HttpException, IOException {
194                 status = response.getCode();
195                 resultCallbackRef.set(resultCallback);
196                 stats.setVersion(response.getVersion());
197                 final Header serverHeader = response.getFirstHeader(HttpHeaders.SERVER);
198                 if (serverHeader != null) {
199                     stats.setServerName(serverHeader.getValue());
200                 }
201                 if (config.getVerbosity() >= 2) {
202                     System.out.println(response.getCode());
203                 }
204                 if (entityDetails != null) {
205                     if (config.getVerbosity() >= 6) {
206                         if (entityDetails.getContentType() != null) {
207                             final ContentType contentType = ContentType.parseLenient(entityDetails.getContentType());
208                             charset = contentType.getCharset();
209                         }
210                     }
211                 } else {
212                     streamEnd(null);
213                 }
214             }
215 
216             @Override
217             public void informationResponse(
218                     final HttpResponse response,
219                     final HttpContext context) throws HttpException, IOException {
220             }
221 
222             @Override
223             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
224                 capacityChannel.update(Integer.MAX_VALUE);
225             }
226 
227             @Override
228             public void consume(final ByteBuffer src) throws IOException {
229                 final int n = src.remaining();
230                 contentLength.addAndGet(n);
231                 stats.incTotalContentLength(n);
232                 if (config.getVerbosity() >= 6) {
233                     final CharsetDecoder decoder = (charset != null ? charset : StandardCharsets.US_ASCII).newDecoder();
234                     System.out.print(decoder.decode(src));
235                 }
236             }
237 
238             @Override
239             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
240                 if (status == HttpStatus.SC_OK) {
241                     stats.incSuccessCount();
242                 } else {
243                     stats.incFailureCount();
244                 }
245                 stats.setContentLength(contentLength.get());
246                 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
247                 if (resultCallback != null) {
248                     resultCallback.completed(null);
249                 }
250                 if (config.getVerbosity() >= 6) {
251                     System.out.println();
252                     System.out.println();
253                 }
254             }
255 
256             @Override
257             public void failed(final Exception cause) {
258                 stats.incFailureCount();
259                 final FutureCallback<Void> resultCallback = resultCallbackRef.getAndSet(null);
260                 if (resultCallback != null) {
261                     resultCallback.failed(cause);
262                 }
263                 if (config.getVerbosity() >= 1) {
264                     System.out.println("HTTP response error: " + cause.getMessage());
265                 }
266             }
267 
268             @Override
269             public void releaseResources() {
270             }
271 
272         };
273     }
274 
275     public void execute() {
276         if (requestCount.decrementAndGet() >= 0) {
277             AsyncClientEndpoint endpoint = endpointRef.get();
278             if (endpoint != null && !endpoint.isConnected()) {
279                 endpoint.releaseAndDiscard();
280                 endpoint = null;
281             }
282             if (endpoint == null) {
283                 requester.connect(host, config.getSocketTimeout(), null, new FutureCallback<AsyncClientEndpoint>() {
284 
285                     @Override
286                     public void completed(final AsyncClientEndpoint endpoint) {
287                         endpointRef.set(endpoint);
288                         endpoint.execute(
289                                 createRequestProducer(),
290                                 createResponseConsumer(),
291                                 context,
292                                 new FutureCallback<Void>() {
293 
294                                     @Override
295                                     public void completed(final Void result) {
296                                         execute();
297                                     }
298 
299                                     @Override
300                                     public void failed(final Exception cause) {
301                                         execute();
302                                     }
303 
304                                     @Override
305                                     public void cancelled() {
306                                         completionLatch.countDown();
307                                     }
308 
309                                 });
310                     }
311 
312                     @Override
313                     public void failed(final Exception cause) {
314                         stats.incFailureCount();
315                         if (config.getVerbosity() >= 1) {
316                             System.out.println("Connect error: " + cause.getMessage());
317                         }
318                         execute();
319                     }
320 
321                     @Override
322                     public void cancelled() {
323                         completionLatch.countDown();
324                     }
325 
326                 });
327             } else {
328                 stats.incKeepAliveCount();
329                 endpoint.execute(
330                         createRequestProducer(),
331                         createResponseConsumer(),
332                         context,
333                         new FutureCallback<Void>() {
334 
335                             @Override
336                             public void completed(final Void result) {
337                                 execute();
338                             }
339 
340                             @Override
341                             public void failed(final Exception cause) {
342                                 execute();
343                             }
344 
345                             @Override
346                             public void cancelled() {
347                                 completionLatch.countDown();
348                             }
349 
350                         });
351             }
352         } else {
353             completionLatch.countDown();
354         }
355     }
356 
357     @Override
358     public void releaseResources() {
359         final AsyncClientEndpoint endpoint = endpointRef.getAndSet(null);
360         if (endpoint != null) {
361             endpoint.releaseAndDiscard();
362         }
363     }
364 
365 }