001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.support; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Comparator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.ConcurrentMap; 027 import java.util.concurrent.ScheduledExecutorService; 028 import java.util.concurrent.ScheduledFuture; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.locks.Lock; 031 import java.util.concurrent.locks.ReentrantLock; 032 033 import org.apache.camel.TimeoutMap; 034 import org.apache.camel.util.ObjectHelper; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * Default implementation of the {@link TimeoutMap}. 040 * <p/> 041 * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not. 042 * By default locking is enabled and thus we are thread safe. 043 * <p/> 044 * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used 045 * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler 046 * if its being stopped. 047 * You must also invoke {@link #start()} to startup the timeout map, before its ready to be used. 048 * And you must invoke {@link #stop()} to stop the map when no longer in use. 049 * 050 * @version 051 */ 052 public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable { 053 054 protected final Logger log = LoggerFactory.getLogger(getClass()); 055 056 private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>(); 057 private final ScheduledExecutorService executor; 058 private volatile ScheduledFuture future; 059 private final long purgePollTime; 060 private final Lock lock = new ReentrantLock(); 061 private boolean useLock = true; 062 063 public DefaultTimeoutMap(ScheduledExecutorService executor) { 064 this(executor, 1000); 065 } 066 067 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 068 this(executor, requestMapPollTimeMillis, true); 069 } 070 071 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) { 072 ObjectHelper.notNull(executor, "ScheduledExecutorService"); 073 this.executor = executor; 074 this.purgePollTime = requestMapPollTimeMillis; 075 this.useLock = useLock; 076 } 077 078 public V get(K key) { 079 TimeoutMapEntry<K, V> entry; 080 if (useLock) { 081 lock.lock(); 082 } 083 try { 084 entry = map.get(key); 085 if (entry == null) { 086 return null; 087 } 088 updateExpireTime(entry); 089 } finally { 090 if (useLock) { 091 lock.unlock(); 092 } 093 } 094 return entry.getValue(); 095 } 096 097 public void put(K key, V value, long timeoutMillis) { 098 TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis); 099 if (useLock) { 100 lock.lock(); 101 } 102 try { 103 map.put(key, entry); 104 updateExpireTime(entry); 105 } finally { 106 if (useLock) { 107 lock.unlock(); 108 } 109 } 110 } 111 112 public V remove(K key) { 113 TimeoutMapEntry<K, V> entry; 114 115 if (useLock) { 116 lock.lock(); 117 } 118 try { 119 entry = map.remove(key); 120 } finally { 121 if (useLock) { 122 lock.unlock(); 123 } 124 } 125 126 return entry != null ? entry.getValue() : null; 127 } 128 129 public Object[] getKeys() { 130 Object[] keys; 131 if (useLock) { 132 lock.lock(); 133 } 134 try { 135 Set<K> keySet = map.keySet(); 136 keys = new Object[keySet.size()]; 137 keySet.toArray(keys); 138 } finally { 139 if (useLock) { 140 lock.unlock(); 141 } 142 } 143 return keys; 144 } 145 146 public int size() { 147 return map.size(); 148 } 149 150 /** 151 * The timer task which purges old requests and schedules another poll 152 */ 153 public void run() { 154 // only run if allowed 155 if (!isRunAllowed()) { 156 log.trace("Purge task not allowed to run"); 157 return; 158 } 159 160 log.trace("Running purge task to see if any entries has been timed out"); 161 try { 162 purge(); 163 } catch (Throwable t) { 164 // must catch and log exception otherwise the executor will now schedule next run 165 log.warn("Exception occurred during purge task. This exception will be ignored.", t); 166 } 167 } 168 169 public void purge() { 170 log.trace("There are {} in the timeout map", map.size()); 171 if (map.isEmpty()) { 172 return; 173 } 174 175 long now = currentTime(); 176 177 List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>(); 178 179 if (useLock) { 180 lock.lock(); 181 } 182 try { 183 // need to find the expired entries and add to the expired list 184 for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) { 185 if (entry.getValue().getExpireTime() < now) { 186 if (isValidForEviction(entry.getValue())) { 187 log.debug("Evicting inactive entry ID: {}", entry.getValue()); 188 expired.add(entry.getValue()); 189 } 190 } 191 } 192 193 // if we found any expired then we need to sort, onEviction and remove 194 if (!expired.isEmpty()) { 195 // sort according to the expired time so we got the first expired first 196 Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() { 197 public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) { 198 long diff = a.getExpireTime() - b.getExpireTime(); 199 if (diff == 0) { 200 return 0; 201 } 202 return diff > 0 ? 1 : -1; 203 } 204 }); 205 206 List<K> evicts = new ArrayList<K>(expired.size()); 207 try { 208 // now fire eviction notification 209 for (TimeoutMapEntry<K, V> entry : expired) { 210 boolean evict = false; 211 try { 212 evict = onEviction(entry.getKey(), entry.getValue()); 213 } catch (Throwable t) { 214 log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 215 entry.getValue(), t); 216 } 217 if (evict) { 218 // okay this entry should be evicted 219 evicts.add(entry.getKey()); 220 } 221 } 222 } finally { 223 // and must remove from list after we have fired the notifications 224 for (K key : evicts) { 225 map.remove(key); 226 } 227 } 228 } 229 } finally { 230 if (useLock) { 231 lock.unlock(); 232 } 233 } 234 } 235 236 // Properties 237 // ------------------------------------------------------------------------- 238 239 public long getPurgePollTime() { 240 return purgePollTime; 241 } 242 243 public ScheduledExecutorService getExecutor() { 244 return executor; 245 } 246 247 // Implementation methods 248 // ------------------------------------------------------------------------- 249 250 /** 251 * lets schedule each time to allow folks to change the time at runtime 252 */ 253 protected void schedulePoll() { 254 future = executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS); 255 } 256 257 /** 258 * A hook to allow derivations to avoid evicting the current entry 259 */ 260 protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) { 261 return true; 262 } 263 264 public boolean onEviction(K key, V value) { 265 return true; 266 } 267 268 protected void updateExpireTime(TimeoutMapEntry<K, V> entry) { 269 long now = currentTime(); 270 entry.setExpireTime(entry.getTimeout() + now); 271 } 272 273 protected long currentTime() { 274 return System.currentTimeMillis(); 275 } 276 277 @Override 278 protected void doStart() throws Exception { 279 if (executor.isShutdown()) { 280 throw new IllegalStateException("The ScheduledExecutorService is shutdown"); 281 } 282 schedulePoll(); 283 } 284 285 @Override 286 protected void doStop() throws Exception { 287 if (future != null) { 288 future.cancel(false); 289 future = null; 290 } 291 // clear map if we stop 292 map.clear(); 293 } 294 295 }