View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.cache;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.HashSet;
32  import java.util.Set;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.ScheduledFuture;
37  import java.util.concurrent.ScheduledThreadPoolExecutor;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
41  import org.apache.hc.client5.http.schedule.SchedulingStrategy;
42  import org.apache.hc.core5.util.Args;
43  import org.apache.hc.core5.util.TimeValue;
44  import org.apache.hc.core5.util.Timeout;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  /**
49   * Abstract cache re-validation class.
50   */
51  class CacheRevalidatorBase implements Closeable {
52  
53      private final ReentrantLock lock;
54  
55      interface ScheduledExecutor {
56  
57          Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
58  
59          void shutdown();
60  
61          void awaitTermination(final Timeout timeout) throws InterruptedException;
62  
63      }
64  
65      public static ScheduledExecutor wrap(final ScheduledExecutorService executorService) {
66  
67          return new ScheduledExecutor() {
68  
69              @Override
70              public ScheduledFuture<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
71                  Args.notNull(command, "Runnable");
72                  Args.notNull(timeValue, "Time value");
73                  return executorService.schedule(command, timeValue.getDuration(), timeValue.getTimeUnit());
74              }
75  
76              @Override
77              public void shutdown() {
78                  executorService.shutdown();
79              }
80  
81              @Override
82              public void awaitTermination(final Timeout timeout) throws InterruptedException {
83                  Args.notNull(timeout, "Timeout");
84                  executorService.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
85              }
86  
87          };
88  
89      }
90  
91      private final ScheduledExecutor scheduledExecutor;
92      private final SchedulingStrategy schedulingStrategy;
93      private final Set<String> pendingRequest;
94      private final ConcurrentCountMap<String> failureCache;
95  
96      private static final Logger LOG = LoggerFactory.getLogger(CacheRevalidatorBase.class);
97  
98      /**
99       * Create CacheValidator which will make ache revalidation requests
100      * using the supplied {@link SchedulingStrategy} and {@link ScheduledExecutor}.
101      */
102     public CacheRevalidatorBase(
103             final ScheduledExecutor scheduledExecutor,
104             final SchedulingStrategy schedulingStrategy) {
105         this.scheduledExecutor = scheduledExecutor;
106         this.schedulingStrategy = schedulingStrategy;
107         this.pendingRequest = new HashSet<>();
108         this.failureCache = new ConcurrentCountMap<>();
109         this.lock = new ReentrantLock();
110     }
111 
112     /**
113      * Create CacheValidator which will make ache revalidation requests
114      * using the supplied {@link SchedulingStrategy} and {@link ScheduledThreadPoolExecutor}.
115      */
116     public CacheRevalidatorBase(
117             final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
118             final SchedulingStrategy schedulingStrategy) {
119         this(wrap(scheduledThreadPoolExecutor), schedulingStrategy);
120     }
121 
122     /**
123      * Schedules an asynchronous re-validation
124      */
125     void scheduleRevalidation(final String cacheKey, final Runnable command) {
126         lock.lock();
127         try {
128             if (!pendingRequest.contains(cacheKey)) {
129                 final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
130                 final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
131                 try {
132                     scheduledExecutor.schedule(command, executionTime);
133                     pendingRequest.add(cacheKey);
134                 } catch (final RejectedExecutionException ex) {
135                     LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
136                 }
137             }
138         } finally {
139             lock.unlock();
140         }
141     }
142 
143     @Override
144     public void close() throws IOException {
145         scheduledExecutor.shutdown();
146     }
147 
148     public void awaitTermination(final Timeout timeout) throws InterruptedException {
149         Args.notNull(timeout, "Timeout");
150         scheduledExecutor.awaitTermination(timeout);
151     }
152 
153     void jobSuccessful(final String identifier) {
154         failureCache.resetCount(identifier);
155         lock.lock();
156         try {
157             pendingRequest.remove(identifier);
158         } finally {
159             lock.unlock();
160         }
161     }
162 
163     void jobFailed(final String identifier) {
164         failureCache.increaseCount(identifier);
165         lock.lock();
166         try {
167             pendingRequest.remove(identifier);
168         } finally {
169             lock.unlock();
170         }
171     }
172 
173     Set<String> getScheduledIdentifiers() {
174         lock.lock();
175         try {
176             return new HashSet<>(pendingRequest);
177         } finally {
178             lock.unlock();
179         }
180     }
181 
182 }