001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.support;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Comparator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentMap;
027    import java.util.concurrent.ScheduledExecutorService;
028    import java.util.concurrent.ScheduledFuture;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.locks.Lock;
031    import java.util.concurrent.locks.ReentrantLock;
032    
033    import org.apache.camel.TimeoutMap;
034    import org.apache.camel.util.ObjectHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Default implementation of the {@link TimeoutMap}.
040     * <p/>
041     * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not.
042     * By default locking is enabled and thus we are thread safe.
043     * <p/>
044     * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used
045     * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler
046     * if its being stopped.
047     * You must also invoke {@link #start()} to startup the timeout map, before its ready to be used.
048     * And you must invoke {@link #stop()} to stop the map when no longer in use.
049     *
050     * @version 
051     */
052    public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable {
053    
054        protected final Logger log = LoggerFactory.getLogger(getClass());
055    
056        private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>();
057        private final ScheduledExecutorService executor;
058        private volatile ScheduledFuture future;
059        private final long purgePollTime;
060        private final Lock lock = new ReentrantLock();
061        private boolean useLock = true;
062    
063        public DefaultTimeoutMap(ScheduledExecutorService executor) {
064            this(executor, 1000);
065        }
066    
067        public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
068            this(executor, requestMapPollTimeMillis, true);
069        }
070    
071        public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) {
072            ObjectHelper.notNull(executor, "ScheduledExecutorService");
073            this.executor = executor;
074            this.purgePollTime = requestMapPollTimeMillis;
075            this.useLock = useLock;
076        }
077    
078        public V get(K key) {
079            TimeoutMapEntry<K, V> entry;
080            if (useLock) {
081                lock.lock();
082            }
083            try {
084                entry = map.get(key);
085                if (entry == null) {
086                    return null;
087                }
088                updateExpireTime(entry);
089            } finally {
090                if (useLock) {
091                    lock.unlock();
092                }
093            }
094            return entry.getValue();
095        }
096    
097        public void put(K key, V value, long timeoutMillis) {
098            TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
099            if (useLock) {
100                lock.lock();
101            }
102            try {
103                map.put(key, entry);
104                updateExpireTime(entry);
105            } finally {
106                if (useLock) {
107                    lock.unlock();
108                }
109            }
110        }
111    
112        public V remove(K key) {
113            TimeoutMapEntry<K, V> entry;
114    
115            if (useLock) {
116                lock.lock();
117            }
118            try {
119                entry = map.remove(key);
120            } finally {
121                if (useLock) {
122                    lock.unlock();
123                }
124            }
125    
126            return entry != null ? entry.getValue() : null;
127        }
128    
129        public Object[] getKeys() {
130            Object[] keys;
131            if (useLock) {
132                lock.lock();
133            }
134            try {
135                Set<K> keySet = map.keySet();
136                keys = new Object[keySet.size()];
137                keySet.toArray(keys);
138            } finally {
139                if (useLock) {
140                    lock.unlock();
141                }
142            }
143            return keys;
144        }
145        
146        public int size() {
147            return map.size();
148        }
149    
150        /**
151         * The timer task which purges old requests and schedules another poll
152         */
153        public void run() {
154            // only run if allowed
155            if (!isRunAllowed()) {
156                log.trace("Purge task not allowed to run");
157                return;
158            }
159    
160            log.trace("Running purge task to see if any entries has been timed out");
161            try {
162                purge();
163            } catch (Throwable t) {
164                // must catch and log exception otherwise the executor will now schedule next run
165                log.warn("Exception occurred during purge task. This exception will be ignored.", t);
166            }
167        }
168    
169        public void purge() {
170            log.trace("There are {} in the timeout map", map.size());
171            if (map.isEmpty()) {
172                return;
173            }
174            
175            long now = currentTime();
176    
177            List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>();
178    
179            if (useLock) {
180                lock.lock();
181            }
182            try {
183                // need to find the expired entries and add to the expired list
184                for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
185                    if (entry.getValue().getExpireTime() < now) {
186                        if (isValidForEviction(entry.getValue())) {
187                            log.debug("Evicting inactive entry ID: {}", entry.getValue());
188                            expired.add(entry.getValue());
189                        }
190                    }
191                }
192    
193                // if we found any expired then we need to sort, onEviction and remove
194                if (!expired.isEmpty()) {
195                    // sort according to the expired time so we got the first expired first
196                    Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() {
197                        public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) {
198                            long diff = a.getExpireTime() - b.getExpireTime();
199                            if (diff == 0) {
200                                return 0;
201                            }
202                            return diff > 0 ? 1 : -1;
203                        }
204                    });
205    
206                    List<K> evicts = new ArrayList<K>(expired.size());
207                    try {
208                        // now fire eviction notification
209                        for (TimeoutMapEntry<K, V> entry : expired) {
210                            boolean evict = false;
211                            try {
212                                evict = onEviction(entry.getKey(), entry.getValue());
213                            } catch (Throwable t) {
214                                log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 
215                                        entry.getValue(), t);
216                            }
217                            if (evict) {
218                                // okay this entry should be evicted
219                                evicts.add(entry.getKey());
220                            }
221                        }
222                    } finally {
223                        // and must remove from list after we have fired the notifications
224                        for (K key : evicts) {
225                            map.remove(key);
226                        }
227                    }
228                }
229            } finally {
230                if (useLock) {
231                    lock.unlock();
232                }
233            }
234        }
235    
236        // Properties
237        // -------------------------------------------------------------------------
238        
239        public long getPurgePollTime() {
240            return purgePollTime;
241        }
242    
243        public ScheduledExecutorService getExecutor() {
244            return executor;
245        }
246    
247        // Implementation methods
248        // -------------------------------------------------------------------------
249    
250        /**
251         * lets schedule each time to allow folks to change the time at runtime
252         */
253        protected void schedulePoll() {
254            future = executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS);
255        }
256    
257        /**
258         * A hook to allow derivations to avoid evicting the current entry
259         */
260        protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) {
261            return true;
262        }
263    
264        public boolean onEviction(K key, V value) {
265            return true;
266        }
267    
268        protected void updateExpireTime(TimeoutMapEntry<K, V> entry) {
269            long now = currentTime();
270            entry.setExpireTime(entry.getTimeout() + now);
271        }
272    
273        protected long currentTime() {
274            return System.currentTimeMillis();
275        }
276    
277        @Override
278        protected void doStart() throws Exception {
279            if (executor.isShutdown()) {
280                throw new IllegalStateException("The ScheduledExecutorService is shutdown");
281            }
282            schedulePoll();
283        }
284    
285        @Override
286        protected void doStop() throws Exception {
287            if (future != null) {
288                future.cancel(false);
289                future = null;
290            }
291            // clear map if we stop
292            map.clear();
293        }
294    
295    }