001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.idempotent;
018    
019    import java.io.File;
020    import java.io.FileOutputStream;
021    import java.io.IOException;
022    import java.util.Map;
023    import java.util.Scanner;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import org.apache.camel.api.management.ManagedAttribute;
027    import org.apache.camel.api.management.ManagedOperation;
028    import org.apache.camel.api.management.ManagedResource;
029    import org.apache.camel.spi.IdempotentRepository;
030    import org.apache.camel.support.ServiceSupport;
031    import org.apache.camel.util.FileUtil;
032    import org.apache.camel.util.IOHelper;
033    import org.apache.camel.util.LRUCache;
034    import org.apache.camel.util.ObjectHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
040     * <p/>
041     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
042     * memory leak.
043     *
044     * @version 
045     */
046    @ManagedResource(description = "File based idempotent repository")
047    public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
048        private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
049        private static final String STORE_DELIMITER = "\n";
050        private Map<String, Object> cache;
051        private File fileStore;
052        private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
053        private AtomicBoolean init = new AtomicBoolean();
054    
055        public FileIdempotentRepository() {
056            // default use a 1st level cache 
057            this.cache = new LRUCache<String, Object>(1000);
058        }
059    
060        public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
061            this.fileStore = fileStore;
062            this.cache = set;
063        }
064    
065        /**
066         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
067         * as 1st level cache with a default of 1000 entries in the cache.
068         *
069         * @param fileStore  the file store
070         */
071        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
072            return fileIdempotentRepository(fileStore, 1000);
073        }
074    
075        /**
076         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
077         * as 1st level cache.
078         *
079         * @param fileStore  the file store
080         * @param cacheSize  the cache size
081         */
082        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
083            return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
084        }
085    
086        /**
087         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
088         * as 1st level cache.
089         *
090         * @param fileStore  the file store
091         * @param cacheSize  the cache size
092         * @param maxFileStoreSize  the max size in bytes for the filestore file 
093         */
094        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
095            FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
096            repository.setMaxFileStoreSize(maxFileStoreSize);
097            return repository;
098        }
099    
100        /**
101         * Creates a new file based repository using the given {@link java.util.Map}
102         * as 1st level cache.
103         * <p/>
104         * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
105         * memory leak.
106         *
107         * @param store  the file store
108         * @param cache  the cache to use as 1st level cache
109         */
110        public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
111            return new FileIdempotentRepository(store, cache);
112        }
113    
114        @ManagedOperation(description = "Adds the key to the store")
115        public boolean add(String key) {
116            synchronized (cache) {
117                if (cache.containsKey(key)) {
118                    return false;
119                } else {
120                    cache.put(key, key);
121                    if (fileStore.length() < maxFileStoreSize) {
122                        // just append to store
123                        appendToStore(key);
124                    } else {
125                        // trunk store and flush the cache
126                        trunkStore();
127                    }
128    
129                    return true;
130                }
131            }
132        }
133    
134        @ManagedOperation(description = "Does the store contain the given key")
135        public boolean contains(String key) {
136            synchronized (cache) {
137                return cache.containsKey(key);
138            }
139        }
140    
141        @ManagedOperation(description = "Remove the key from the store")
142        public boolean remove(String key) {
143            boolean answer;
144            synchronized (cache) {
145                answer = cache.remove(key) != null;
146                // trunk store and flush the cache on remove
147                trunkStore();
148            }
149            return answer;
150        }
151    
152        public boolean confirm(String key) {
153            // noop
154            return true;
155        }
156    
157        public File getFileStore() {
158            return fileStore;
159        }
160    
161        public void setFileStore(File fileStore) {
162            this.fileStore = fileStore;
163        }
164    
165        @ManagedAttribute(description = "The file path for the store")
166        public String getFilePath() {
167            return fileStore.getPath();
168        }
169    
170        public Map<String, Object> getCache() {
171            return cache;
172        }
173    
174        public void setCache(Map<String, Object> cache) {
175            this.cache = cache;
176        }
177    
178        @ManagedAttribute(description = "The maximum file size for the file store in bytes")
179        public long getMaxFileStoreSize() {
180            return maxFileStoreSize;
181        }
182    
183        /**
184         * Sets the maximum file size for the file store in bytes.
185         * <p/>
186         * The default is 1mb.
187         */
188        @ManagedAttribute(description = "The maximum file size for the file store in bytes")
189        public void setMaxFileStoreSize(long maxFileStoreSize) {
190            this.maxFileStoreSize = maxFileStoreSize;
191        }
192    
193        /**
194         * Sets the cache size
195         */
196        public void setCacheSize(int size) {
197            if (cache != null) {
198                cache.clear();
199            }
200            cache = new LRUCache<String, Object>(size);
201        }
202    
203        @ManagedAttribute(description = "The current cache size")
204        public int getCacheSize() {
205            if (cache != null) {
206                return cache.size();
207            }
208            return 0;
209        }
210    
211        /**
212         * Reset and clears the store to force it to reload from file
213         */
214        @ManagedOperation(description = "Reset and reloads the file store")
215        public synchronized void reset() {
216            synchronized (cache) {
217                // trunk and clear, before we reload the store
218                trunkStore();
219                cache.clear();
220                loadStore();
221            }
222        }
223    
224        /**
225         * Appends the given message id to the file store
226         *
227         * @param messageId  the message id
228         */
229        protected void appendToStore(final String messageId) {
230            LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
231            FileOutputStream fos = null;
232            try {
233                // create store parent directory if missing
234                File storeParentDirectory = fileStore.getParentFile();
235                if (storeParentDirectory != null && !storeParentDirectory.exists()) {
236                    LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
237                    if (fileStore.getParentFile().mkdirs()) {
238                        LOG.info("Parent directory of file store {} successfully created.", fileStore);
239                    } else {
240                        LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
241                    }
242                }
243                // create store if missing
244                if (!fileStore.exists()) {
245                    FileUtil.createNewFile(fileStore);
246                }
247                // append to store
248                fos = new FileOutputStream(fileStore, true);
249                fos.write(messageId.getBytes());
250                fos.write(STORE_DELIMITER.getBytes());
251            } catch (IOException e) {
252                throw ObjectHelper.wrapRuntimeCamelException(e);
253            } finally {
254                IOHelper.close(fos, "Appending to file idempotent repository", LOG);
255            }
256        }
257    
258        /**
259         * Trunks the file store when the max store size is hit by rewriting the 1st level cache
260         * to the file store.
261         */
262        protected void trunkStore() {
263            if (fileStore == null || !fileStore.exists()) {
264                return;
265            }
266    
267            LOG.info("Trunking idempotent filestore: {}", fileStore);
268            FileOutputStream fos = null;
269            try {
270                fos = new FileOutputStream(fileStore);
271                for (String key : cache.keySet()) {
272                    fos.write(key.getBytes());
273                    fos.write(STORE_DELIMITER.getBytes());
274                }
275            } catch (IOException e) {
276                throw ObjectHelper.wrapRuntimeCamelException(e);
277            } finally {
278                IOHelper.close(fos, "Trunking file idempotent repository", LOG);
279            }
280        }
281    
282        /**
283         * Loads the given file store into the 1st level cache
284         */
285        protected void loadStore() {
286            if (fileStore == null || !fileStore.exists()) {
287                return;
288            }
289    
290            LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
291    
292            cache.clear();
293            Scanner scanner = null;
294            try {
295                scanner = new Scanner(fileStore);
296                scanner.useDelimiter(STORE_DELIMITER);
297                while (scanner.hasNextLine()) {
298                    String line = scanner.nextLine();
299                    cache.put(line, line);
300                }
301            } catch (IOException e) {
302                throw ObjectHelper.wrapRuntimeCamelException(e);
303            } finally {
304                if (scanner != null) {
305                    scanner.close();
306                }
307            }
308    
309            LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
310        }
311    
312        @Override
313        protected void doStart() throws Exception {
314            // init store if not loaded before
315            if (init.compareAndSet(false, true)) {
316                loadStore();
317            }
318        }
319    
320        @Override
321        protected void doStop() throws Exception {
322            // reset will trunk and clear the cache
323            trunkStore();
324            cache.clear();
325            init.set(false);
326        }
327    
328    }