001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.idempotent; 018 019 import java.util.Map; 020 021 import org.apache.camel.api.management.ManagedAttribute; 022 import org.apache.camel.api.management.ManagedOperation; 023 import org.apache.camel.api.management.ManagedResource; 024 import org.apache.camel.spi.IdempotentRepository; 025 import org.apache.camel.support.ServiceSupport; 026 import org.apache.camel.util.LRUCache; 027 028 /** 029 * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 030 * <p/> 031 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a 032 * memory leak. 033 * 034 * @version 035 */ 036 @ManagedResource(description = "Memory based idempotent repository") 037 public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { 038 private Map<String, Object> cache; 039 private int cacheSize; 040 041 public MemoryIdempotentRepository() { 042 this.cache = new LRUCache<String, Object>(1000); 043 } 044 045 public MemoryIdempotentRepository(Map<String, Object> set) { 046 this.cache = set; 047 } 048 049 /** 050 * Creates a new memory based repository using a {@link LRUCache} 051 * with a default of 1000 entries in the cache. 052 */ 053 public static IdempotentRepository<String> memoryIdempotentRepository() { 054 return new MemoryIdempotentRepository(); 055 } 056 057 /** 058 * Creates a new memory based repository using a {@link LRUCache}. 059 * 060 * @param cacheSize the cache size 061 */ 062 public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) { 063 return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize)); 064 } 065 066 /** 067 * Creates a new memory based repository using the given {@link Map} to 068 * use to store the processed message ids. 069 * <p/> 070 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a 071 * memory leak. 072 * 073 * @param cache the cache 074 */ 075 public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) { 076 return new MemoryIdempotentRepository(cache); 077 } 078 079 @ManagedOperation(description = "Adds the key to the store") 080 public boolean add(String key) { 081 synchronized (cache) { 082 if (cache.containsKey(key)) { 083 return false; 084 } else { 085 cache.put(key, key); 086 return true; 087 } 088 } 089 } 090 091 @ManagedOperation(description = "Does the store contain the given key") 092 public boolean contains(String key) { 093 synchronized (cache) { 094 return cache.containsKey(key); 095 } 096 } 097 098 @ManagedOperation(description = "Remove the key from the store") 099 public boolean remove(String key) { 100 synchronized (cache) { 101 return cache.remove(key) != null; 102 } 103 } 104 105 public boolean confirm(String key) { 106 // noop 107 return true; 108 } 109 110 public Map<String, Object> getCache() { 111 return cache; 112 } 113 114 @ManagedAttribute(description = "The current cache size") 115 public int getCacheSize() { 116 return cache.size(); 117 } 118 119 public void setCacheSize(int cacheSize) { 120 this.cacheSize = cacheSize; 121 } 122 123 @Override 124 protected void doStart() throws Exception { 125 if (cacheSize > 0) { 126 cache = new LRUCache<String, Object>(cacheSize); 127 } 128 } 129 130 @Override 131 protected void doStop() throws Exception { 132 cache.clear(); 133 } 134 }