1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.Set;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39
40 import org.apache.hc.client5.http.cache.HeaderConstants;
41 import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
42 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.util.Args;
46 import org.apache.hc.core5.util.TimeValue;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51
52
53
54 class CacheRevalidatorBase implements Closeable {
55
56 interface ScheduledExecutor {
57
58 Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
59
60 void shutdown();
61
62 void awaitTermination(final Timeout timeout) throws InterruptedException;
63
64 }
65
66 public static ScheduledExecutor wrap(final ScheduledExecutorService executorService) {
67
68 return new ScheduledExecutor() {
69
70 @Override
71 public ScheduledFuture<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
72 Args.notNull(command, "Runnable");
73 Args.notNull(timeValue, "Time value");
74 return executorService.schedule(command, timeValue.getDuration(), timeValue.getTimeUnit());
75 }
76
77 @Override
78 public void shutdown() {
79 executorService.shutdown();
80 }
81
82 @Override
83 public void awaitTermination(final Timeout timeout) throws InterruptedException {
84 Args.notNull(timeout, "Timeout");
85 executorService.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
86 }
87
88 };
89
90 }
91
92 private final ScheduledExecutor scheduledExecutor;
93 private final SchedulingStrategy schedulingStrategy;
94 private final Set<String> pendingRequest;
95 private final ConcurrentCountMap<String> failureCache;
96
97 private static final Logger LOG = LoggerFactory.getLogger(CacheRevalidatorBase.class);
98
99
100
101
102
103 public CacheRevalidatorBase(
104 final ScheduledExecutor scheduledExecutor,
105 final SchedulingStrategy schedulingStrategy) {
106 this.scheduledExecutor = scheduledExecutor;
107 this.schedulingStrategy = schedulingStrategy;
108 this.pendingRequest = new HashSet<>();
109 this.failureCache = new ConcurrentCountMap<>();
110 }
111
112
113
114
115
116 public CacheRevalidatorBase(
117 final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
118 final SchedulingStrategy schedulingStrategy) {
119 this(wrap(scheduledThreadPoolExecutor), schedulingStrategy);
120 }
121
122
123
124
125 void scheduleRevalidation(final String cacheKey, final Runnable command) {
126 synchronized (pendingRequest) {
127 if (!pendingRequest.contains(cacheKey)) {
128 final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
129 final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
130 try {
131 scheduledExecutor.schedule(command, executionTime);
132 pendingRequest.add(cacheKey);
133 } catch (final RejectedExecutionException ex) {
134 LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
135 }
136 }
137 }
138 }
139
140 @Override
141 public void close() throws IOException {
142 scheduledExecutor.shutdown();
143 }
144
145 public void awaitTermination(final Timeout timeout) throws InterruptedException {
146 Args.notNull(timeout, "Timeout");
147 scheduledExecutor.awaitTermination(timeout);
148 }
149
150 void jobSuccessful(final String identifier) {
151 failureCache.resetCount(identifier);
152 synchronized (pendingRequest) {
153 pendingRequest.remove(identifier);
154 }
155 }
156
157 void jobFailed(final String identifier) {
158 failureCache.increaseCount(identifier);
159 synchronized (pendingRequest) {
160 pendingRequest.remove(identifier);
161 }
162 }
163
164 Set<String> getScheduledIdentifiers() {
165 synchronized (pendingRequest) {
166 return new HashSet<>(pendingRequest);
167 }
168 }
169
170
171
172
173
174
175 boolean isStale(final HttpResponse httpResponse) {
176 for (final Iterator<Header> it = httpResponse.headerIterator(HeaderConstants.WARNING); it.hasNext(); ) {
177
178
179
180
181
182 final Header warning = it.next();
183 final String warningValue = warning.getValue();
184 if (warningValue.startsWith("110") || warningValue.startsWith("111")) {
185 return true;
186 }
187 }
188 return false;
189 }
190
191 }