001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.CamelContext;
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.FailedToCreateConsumerException;
025    import org.apache.camel.IsSingleton;
026    import org.apache.camel.PollingConsumer;
027    import org.apache.camel.support.ServiceSupport;
028    import org.apache.camel.util.CamelContextHelper;
029    import org.apache.camel.util.LRUCache;
030    import org.apache.camel.util.ServiceHelper;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * Cache containing created {@link org.apache.camel.Consumer}.
036     *
037     * @version 
038     */
039    public class ConsumerCache extends ServiceSupport {
040        private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class);
041        private final CamelContext camelContext;
042        private final Map<String, PollingConsumer> consumers;
043        private final Object source;
044    
045        public ConsumerCache(Object source, CamelContext camelContext) {
046            this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
047        }
048    
049        public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) {
050            this(source, camelContext, createLRUCache(cacheSize));
051        }
052    
053        public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache) {
054            this.camelContext = camelContext;
055            this.consumers = cache;
056            this.source = source;
057        }
058    
059        /**
060         * Creates the {@link LRUCache} to be used.
061         * <p/>
062         * This implementation returns a {@link LRUCache} instance.
063    
064         * @param cacheSize the cache size
065         * @return the cache
066         */
067        protected static LRUCache<String, PollingConsumer> createLRUCache(int cacheSize) {
068            // Use a regular cache as we want to ensure that the lifecycle of the consumers
069            // being cache is properly handled, such as they are stopped when being evicted
070            // or when this cache is stopped. This is needed as some consumers requires to
071            // be stopped so they can shutdown internal resources that otherwise may cause leaks
072            return new LRUCache<String, PollingConsumer>(cacheSize);
073        }
074    
075        public synchronized PollingConsumer getConsumer(Endpoint endpoint) {
076            String key = endpoint.getEndpointUri();
077            PollingConsumer answer = consumers.get(key);
078            if (answer == null) {
079                try {
080                    answer = endpoint.createPollingConsumer();
081                    answer.start();
082                } catch (Exception e) {
083                    throw new FailedToCreateConsumerException(endpoint, e);
084                }
085    
086                boolean singleton = true;
087                if (answer instanceof IsSingleton) {
088                    singleton = ((IsSingleton) answer).isSingleton();
089                }
090    
091                if (singleton) {
092                    LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer);
093                    consumers.put(key, answer);
094                } else {
095                    LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key);
096                }
097            }
098            return answer;
099        }
100    
101        public Exchange receive(Endpoint endpoint) {
102            LOG.debug("<<<< {}", endpoint);
103    
104            PollingConsumer consumer = getConsumer(endpoint);
105            return consumer.receive();
106        }
107    
108        public Exchange receive(Endpoint endpoint, long timeout) {
109            LOG.debug("<<<< {}", endpoint);
110    
111            PollingConsumer consumer = getConsumer(endpoint);
112            return consumer.receive(timeout);
113        }
114    
115        public Exchange receiveNoWait(Endpoint endpoint) {
116            LOG.debug("<<<< {}", endpoint);
117    
118            PollingConsumer consumer = getConsumer(endpoint);
119            return consumer.receiveNoWait();
120        }
121        
122        public CamelContext getCamelContext() {
123            return camelContext;
124        }
125    
126        /**
127         * Gets the source which uses this cache
128         *
129         * @return the source
130         */
131        public Object getSource() {
132            return source;
133        }
134    
135        /**
136         * Returns the current size of the cache
137         *
138         * @return the current size
139         */
140        public int size() {
141            int size = consumers.size();
142            LOG.trace("size = {}", size);
143            return size;
144        }
145    
146        /**
147         * Gets the maximum cache size (capacity).
148         * <p/>
149         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
150         *
151         * @return the capacity
152         */
153        public int getCapacity() {
154            int capacity = -1;
155            if (consumers instanceof LRUCache) {
156                LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
157                capacity = cache.getMaxCacheSize();
158            }
159            return capacity;
160        }
161    
162        /**
163         * Gets the cache hits statistic
164         * <p/>
165         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
166         *
167         * @return the hits
168         */
169        public long getHits() {
170            long hits = -1;
171            if (consumers instanceof LRUCache) {
172                LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
173                hits = cache.getHits();
174            }
175            return hits;
176        }
177    
178        /**
179         * Gets the cache misses statistic
180         * <p/>
181         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
182         *
183         * @return the misses
184         */
185        public long getMisses() {
186            long misses = -1;
187            if (consumers instanceof LRUCache) {
188                LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
189                misses = cache.getMisses();
190            }
191            return misses;
192        }
193    
194        /**
195         * Gets the cache evicted statistic
196         * <p/>
197         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
198         *
199         * @return the evicted
200         */
201        public long getEvicted() {
202            long evicted = -1;
203            if (consumers instanceof LRUCache) {
204                LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
205                evicted = cache.getEvicted();
206            }
207            return evicted;
208        }
209    
210        /**
211         * Resets the cache statistics
212         */
213        public void resetCacheStatistics() {
214            if (consumers instanceof LRUCache) {
215                LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
216                cache.resetStatistics();
217            }
218        }
219    
220        /**
221         * Purges this cache
222         */
223        public synchronized void purge() {
224            consumers.clear();
225        }
226    
227        @Override
228        public String toString() {
229            return "ConsumerCache for source: " + source + ", capacity: " + getCapacity();
230        }
231    
232        protected void doStart() throws Exception {
233            ServiceHelper.startServices(consumers.values());
234        }
235    
236        protected void doStop() throws Exception {
237            // when stopping we intend to shutdown
238            ServiceHelper.stopAndShutdownServices(consumers.values());
239            consumers.clear();
240        }
241    
242    }