1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30
31 import org.apache.hc.client5.http.HttpRequestRetryStrategy;
32 import org.apache.hc.client5.http.HttpRoute;
33 import org.apache.hc.client5.http.async.AsyncExecCallback;
34 import org.apache.hc.client5.http.async.AsyncExecChain;
35 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
36 import org.apache.hc.client5.http.impl.ChainElement;
37 import org.apache.hc.client5.http.protocol.HttpClientContext;
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.Internal;
40 import org.apache.hc.core5.annotation.ThreadingBehavior;
41 import org.apache.hc.core5.http.EntityDetails;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpRequest;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
46 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
47 import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
48 import org.apache.hc.core5.http.support.BasicRequestBuilder;
49 import org.apache.hc.core5.util.Args;
50 import org.apache.hc.core5.util.TimeValue;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @Contract(threading = ThreadingBehavior.STATELESS)
85 @Internal
86 public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
87
88 private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpRequestRetryExec.class);
89
90 private final HttpRequestRetryStrategy retryStrategy;
91
92 public AsyncHttpRequestRetryExec(final HttpRequestRetryStrategy retryStrategy) {
93 Args.notNull(retryStrategy, "retryStrategy");
94 this.retryStrategy = retryStrategy;
95 }
96
97 private static class State {
98
99 volatile boolean retrying;
100 volatile TimeValue delay;
101
102 }
103
104 private void internalExecute(
105 final State state,
106 final HttpRequest request,
107 final AsyncEntityProducer entityProducer,
108 final AsyncExecChain.Scope scope,
109 final AsyncExecChain chain,
110 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
111
112 final String exchangeId = scope.exchangeId;
113
114 chain.proceed(BasicRequestBuilder.copy(request).build(), entityProducer, scope, new AsyncExecCallback() {
115
116 @Override
117 public AsyncDataConsumer handleResponse(
118 final HttpResponse response,
119 final EntityDetails entityDetails) throws HttpException, IOException {
120 final HttpClientContext clientContext = scope.clientContext;
121 if (entityProducer != null && !entityProducer.isRepeatable()) {
122 if (LOG.isDebugEnabled()) {
123 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
124 }
125 return asyncExecCallback.handleResponse(response, entityDetails);
126 }
127 state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
128 if (state.retrying) {
129 state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("{} retrying request in {}", exchangeId, state.delay);
132 }
133 return new DiscardingEntityConsumer<>();
134 } else {
135 return asyncExecCallback.handleResponse(response, entityDetails);
136 }
137 }
138
139 @Override
140 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
141 asyncExecCallback.handleInformationResponse(response);
142 }
143
144 @Override
145 public void completed() {
146 if (state.retrying) {
147 scope.execCount.incrementAndGet();
148 if (entityProducer != null) {
149 entityProducer.releaseResources();
150 }
151 scope.scheduler.scheduleExecution(
152 request,
153 entityProducer,
154 scope,
155 (r, e, s, c) -> execute(r, e, s, chain, c),
156 asyncExecCallback,
157 state.delay);
158 } else {
159 asyncExecCallback.completed();
160 }
161 }
162
163 @Override
164 public void failed(final Exception cause) {
165 if (cause instanceof IOException) {
166 final HttpRoute route = scope.route;
167 final HttpClientContext clientContext = scope.clientContext;
168 if (entityProducer != null && !entityProducer.isRepeatable()) {
169 if (LOG.isDebugEnabled()) {
170 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
171 }
172 } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
175 }
176 if (LOG.isInfoEnabled()) {
177 LOG.info("Recoverable I/O exception ({}) caught when processing request to {}",
178 cause.getClass().getName(), route);
179 }
180 scope.execRuntime.discardEndpoint();
181 if (entityProducer != null) {
182 entityProducer.releaseResources();
183 }
184 state.retrying = true;
185 final int execCount = scope.execCount.incrementAndGet();
186 state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext);
187 scope.scheduler.scheduleExecution(
188 request,
189 entityProducer,
190 scope,
191 (r, e, s, c) -> execute(r, e, s, chain, c),
192 asyncExecCallback,
193 state.delay);
194 return;
195 }
196 }
197 asyncExecCallback.failed(cause);
198 }
199
200 });
201
202 }
203
204 @Override
205 public void execute(
206 final HttpRequest request,
207 final AsyncEntityProducer entityProducer,
208 final AsyncExecChain.Scope scope,
209 final AsyncExecChain chain,
210 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
211 final State state = new State();
212 state.retrying = false;
213 internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
214 }
215
216 }