1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.cache;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.HashSet;
32 import java.util.Set;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
41 import org.apache.hc.client5.http.schedule.SchedulingStrategy;
42 import org.apache.hc.core5.util.Args;
43 import org.apache.hc.core5.util.TimeValue;
44 import org.apache.hc.core5.util.Timeout;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48
49
50
51 class CacheRevalidatorBase implements Closeable {
52
53 private final ReentrantLock lock;
54
55 interface ScheduledExecutor {
56
57 Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
58
59 void shutdown();
60
61 void awaitTermination(final Timeout timeout) throws InterruptedException;
62
63 }
64
65 public static ScheduledExecutor wrap(final ScheduledExecutorService executorService) {
66
67 return new ScheduledExecutor() {
68
69 @Override
70 public ScheduledFuture<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
71 Args.notNull(command, "Runnable");
72 Args.notNull(timeValue, "Time value");
73 return executorService.schedule(command, timeValue.getDuration(), timeValue.getTimeUnit());
74 }
75
76 @Override
77 public void shutdown() {
78 executorService.shutdown();
79 }
80
81 @Override
82 public void awaitTermination(final Timeout timeout) throws InterruptedException {
83 Args.notNull(timeout, "Timeout");
84 executorService.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
85 }
86
87 };
88
89 }
90
91 private final ScheduledExecutor scheduledExecutor;
92 private final SchedulingStrategy schedulingStrategy;
93 private final Set<String> pendingRequest;
94 private final ConcurrentCountMap<String> failureCache;
95
96 private static final Logger LOG = LoggerFactory.getLogger(CacheRevalidatorBase.class);
97
98
99
100
101
102 public CacheRevalidatorBase(
103 final ScheduledExecutor scheduledExecutor,
104 final SchedulingStrategy schedulingStrategy) {
105 this.scheduledExecutor = scheduledExecutor;
106 this.schedulingStrategy = schedulingStrategy;
107 this.pendingRequest = new HashSet<>();
108 this.failureCache = new ConcurrentCountMap<>();
109 this.lock = new ReentrantLock();
110 }
111
112
113
114
115
116 public CacheRevalidatorBase(
117 final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
118 final SchedulingStrategy schedulingStrategy) {
119 this(wrap(scheduledThreadPoolExecutor), schedulingStrategy);
120 }
121
122
123
124
125 void scheduleRevalidation(final String cacheKey, final Runnable command) {
126 lock.lock();
127 try {
128 if (!pendingRequest.contains(cacheKey)) {
129 final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
130 final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
131 try {
132 scheduledExecutor.schedule(command, executionTime);
133 pendingRequest.add(cacheKey);
134 } catch (final RejectedExecutionException ex) {
135 LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
136 }
137 }
138 } finally {
139 lock.unlock();
140 }
141 }
142
143 @Override
144 public void close() throws IOException {
145 scheduledExecutor.shutdown();
146 }
147
148 public void awaitTermination(final Timeout timeout) throws InterruptedException {
149 Args.notNull(timeout, "Timeout");
150 scheduledExecutor.awaitTermination(timeout);
151 }
152
153 void jobSuccessful(final String identifier) {
154 failureCache.resetCount(identifier);
155 lock.lock();
156 try {
157 pendingRequest.remove(identifier);
158 } finally {
159 lock.unlock();
160 }
161 }
162
163 void jobFailed(final String identifier) {
164 failureCache.increaseCount(identifier);
165 lock.lock();
166 try {
167 pendingRequest.remove(identifier);
168 } finally {
169 lock.unlock();
170 }
171 }
172
173 Set<String> getScheduledIdentifiers() {
174 lock.lock();
175 try {
176 return new HashSet<>(pendingRequest);
177 } finally {
178 lock.unlock();
179 }
180 }
181
182 }