001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 021 import org.apache.camel.CamelContext; 022 import org.apache.camel.Endpoint; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.FailedToCreateConsumerException; 025 import org.apache.camel.IsSingleton; 026 import org.apache.camel.PollingConsumer; 027 import org.apache.camel.support.ServiceSupport; 028 import org.apache.camel.util.CamelContextHelper; 029 import org.apache.camel.util.LRUCache; 030 import org.apache.camel.util.ServiceHelper; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 /** 035 * Cache containing created {@link org.apache.camel.Consumer}. 036 * 037 * @version 038 */ 039 public class ConsumerCache extends ServiceSupport { 040 private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class); 041 private final CamelContext camelContext; 042 private final Map<String, PollingConsumer> consumers; 043 private final Object source; 044 045 public ConsumerCache(Object source, CamelContext camelContext) { 046 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 047 } 048 049 public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) { 050 this(source, camelContext, createLRUCache(cacheSize)); 051 } 052 053 public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache) { 054 this.camelContext = camelContext; 055 this.consumers = cache; 056 this.source = source; 057 } 058 059 /** 060 * Creates the {@link LRUCache} to be used. 061 * <p/> 062 * This implementation returns a {@link LRUCache} instance. 063 064 * @param cacheSize the cache size 065 * @return the cache 066 */ 067 protected static LRUCache<String, PollingConsumer> createLRUCache(int cacheSize) { 068 // Use a regular cache as we want to ensure that the lifecycle of the consumers 069 // being cache is properly handled, such as they are stopped when being evicted 070 // or when this cache is stopped. This is needed as some consumers requires to 071 // be stopped so they can shutdown internal resources that otherwise may cause leaks 072 return new LRUCache<String, PollingConsumer>(cacheSize); 073 } 074 075 public synchronized PollingConsumer getConsumer(Endpoint endpoint) { 076 String key = endpoint.getEndpointUri(); 077 PollingConsumer answer = consumers.get(key); 078 if (answer == null) { 079 try { 080 answer = endpoint.createPollingConsumer(); 081 answer.start(); 082 } catch (Exception e) { 083 throw new FailedToCreateConsumerException(endpoint, e); 084 } 085 086 boolean singleton = true; 087 if (answer instanceof IsSingleton) { 088 singleton = ((IsSingleton) answer).isSingleton(); 089 } 090 091 if (singleton) { 092 LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer); 093 consumers.put(key, answer); 094 } else { 095 LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key); 096 } 097 } 098 return answer; 099 } 100 101 public Exchange receive(Endpoint endpoint) { 102 LOG.debug("<<<< {}", endpoint); 103 104 PollingConsumer consumer = getConsumer(endpoint); 105 return consumer.receive(); 106 } 107 108 public Exchange receive(Endpoint endpoint, long timeout) { 109 LOG.debug("<<<< {}", endpoint); 110 111 PollingConsumer consumer = getConsumer(endpoint); 112 return consumer.receive(timeout); 113 } 114 115 public Exchange receiveNoWait(Endpoint endpoint) { 116 LOG.debug("<<<< {}", endpoint); 117 118 PollingConsumer consumer = getConsumer(endpoint); 119 return consumer.receiveNoWait(); 120 } 121 122 public CamelContext getCamelContext() { 123 return camelContext; 124 } 125 126 /** 127 * Gets the source which uses this cache 128 * 129 * @return the source 130 */ 131 public Object getSource() { 132 return source; 133 } 134 135 /** 136 * Returns the current size of the cache 137 * 138 * @return the current size 139 */ 140 public int size() { 141 int size = consumers.size(); 142 LOG.trace("size = {}", size); 143 return size; 144 } 145 146 /** 147 * Gets the maximum cache size (capacity). 148 * <p/> 149 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 150 * 151 * @return the capacity 152 */ 153 public int getCapacity() { 154 int capacity = -1; 155 if (consumers instanceof LRUCache) { 156 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 157 capacity = cache.getMaxCacheSize(); 158 } 159 return capacity; 160 } 161 162 /** 163 * Gets the cache hits statistic 164 * <p/> 165 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 166 * 167 * @return the hits 168 */ 169 public long getHits() { 170 long hits = -1; 171 if (consumers instanceof LRUCache) { 172 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 173 hits = cache.getHits(); 174 } 175 return hits; 176 } 177 178 /** 179 * Gets the cache misses statistic 180 * <p/> 181 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 182 * 183 * @return the misses 184 */ 185 public long getMisses() { 186 long misses = -1; 187 if (consumers instanceof LRUCache) { 188 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 189 misses = cache.getMisses(); 190 } 191 return misses; 192 } 193 194 /** 195 * Gets the cache evicted statistic 196 * <p/> 197 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 198 * 199 * @return the evicted 200 */ 201 public long getEvicted() { 202 long evicted = -1; 203 if (consumers instanceof LRUCache) { 204 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 205 evicted = cache.getEvicted(); 206 } 207 return evicted; 208 } 209 210 /** 211 * Resets the cache statistics 212 */ 213 public void resetCacheStatistics() { 214 if (consumers instanceof LRUCache) { 215 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 216 cache.resetStatistics(); 217 } 218 } 219 220 /** 221 * Purges this cache 222 */ 223 public synchronized void purge() { 224 consumers.clear(); 225 } 226 227 @Override 228 public String toString() { 229 return "ConsumerCache for source: " + source + ", capacity: " + getCapacity(); 230 } 231 232 protected void doStart() throws Exception { 233 ServiceHelper.startServices(consumers.values()); 234 } 235 236 protected void doStop() throws Exception { 237 // when stopping we intend to shutdown 238 ServiceHelper.stopAndShutdownServices(consumers.values()); 239 consumers.clear(); 240 } 241 242 }