001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.idempotent;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.api.management.ManagedAttribute;
022    import org.apache.camel.api.management.ManagedOperation;
023    import org.apache.camel.api.management.ManagedResource;
024    import org.apache.camel.spi.IdempotentRepository;
025    import org.apache.camel.support.ServiceSupport;
026    import org.apache.camel.util.LRUCache;
027    
028    /**
029     * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 
030     * <p/>
031     * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
032     * memory leak.
033     *
034     * @version 
035     */
036    @ManagedResource(description = "Memory based idempotent repository")
037    public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
038        private Map<String, Object> cache;
039        private int cacheSize;
040    
041        public MemoryIdempotentRepository() {
042            this.cache = new LRUCache<String, Object>(1000);
043        }
044    
045        public MemoryIdempotentRepository(Map<String, Object> set) {
046            this.cache = set;
047        }
048    
049        /**
050         * Creates a new memory based repository using a {@link LRUCache}
051         * with a default of 1000 entries in the cache.
052         */
053        public static IdempotentRepository<String> memoryIdempotentRepository() {
054            return new MemoryIdempotentRepository();
055        }
056    
057        /**
058         * Creates a new memory based repository using a {@link LRUCache}.
059         *
060         * @param cacheSize  the cache size
061         */
062        public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) {
063            return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize));
064        }
065    
066        /**
067         * Creates a new memory based repository using the given {@link Map} to
068         * use to store the processed message ids.
069         * <p/>
070         * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
071         * memory leak.
072         *
073         * @param cache  the cache
074         */
075        public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) {
076            return new MemoryIdempotentRepository(cache);
077        }
078    
079        @ManagedOperation(description = "Adds the key to the store")
080        public boolean add(String key) {
081            synchronized (cache) {
082                if (cache.containsKey(key)) {
083                    return false;
084                } else {
085                    cache.put(key, key);
086                    return true;
087                }
088            }
089        }
090    
091        @ManagedOperation(description = "Does the store contain the given key")
092        public boolean contains(String key) {
093            synchronized (cache) {
094                return cache.containsKey(key);
095            }
096        }
097    
098        @ManagedOperation(description = "Remove the key from the store")
099        public boolean remove(String key) {
100            synchronized (cache) {
101                return cache.remove(key) != null;
102            }
103        }
104    
105        public boolean confirm(String key) {
106            // noop
107            return true;
108        }
109    
110        public Map<String, Object> getCache() {
111            return cache;
112        }
113    
114        @ManagedAttribute(description = "The current cache size")
115        public int getCacheSize() {
116            return cache.size();
117        }
118    
119        public void setCacheSize(int cacheSize) {
120            this.cacheSize = cacheSize;
121        }
122    
123        @Override
124        protected void doStart() throws Exception {
125            if (cacheSize > 0) {
126                cache = new LRUCache<String, Object>(cacheSize);
127            }
128        }
129    
130        @Override
131        protected void doStop() throws Exception {
132            cache.clear();
133        }
134    }