001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.FailedToCreateProducerException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.camel.ProducerCallback; 032 import org.apache.camel.ServicePoolAware; 033 import org.apache.camel.processor.UnitOfWorkProducer; 034 import org.apache.camel.spi.ServicePool; 035 import org.apache.camel.support.ServiceSupport; 036 import org.apache.camel.util.AsyncProcessorConverterHelper; 037 import org.apache.camel.util.CamelContextHelper; 038 import org.apache.camel.util.EventHelper; 039 import org.apache.camel.util.LRUCache; 040 import org.apache.camel.util.ServiceHelper; 041 import org.apache.camel.util.StopWatch; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 /** 046 * Cache containing created {@link Producer}. 047 * 048 * @version 049 */ 050 public class ProducerCache extends ServiceSupport { 051 private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class); 052 053 private final CamelContext camelContext; 054 private final ServicePool<Endpoint, Producer> pool; 055 private final Map<String, Producer> producers; 056 private final Object source; 057 private boolean eventNotifierEnabled = true; 058 059 public ProducerCache(Object source, CamelContext camelContext) { 060 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 061 } 062 063 public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { 064 this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize)); 065 } 066 067 public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { 068 this(source, camelContext, camelContext.getProducerServicePool(), cache); 069 } 070 071 public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) { 072 this.source = source; 073 this.camelContext = camelContext; 074 this.pool = producerServicePool; 075 this.producers = cache; 076 } 077 078 public boolean isEventNotifierEnabled() { 079 return eventNotifierEnabled; 080 } 081 082 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 083 this.eventNotifierEnabled = eventNotifierEnabled; 084 } 085 086 /** 087 * Creates the {@link LRUCache} to be used. 088 * <p/> 089 * This implementation returns a {@link LRUCache} instance. 090 091 * @param cacheSize the cache size 092 * @return the cache 093 */ 094 protected static LRUCache<String, Producer> createLRUCache(int cacheSize) { 095 // Use a regular cache as we want to ensure that the lifecycle of the producers 096 // being cache is properly handled, such as they are stopped when being evicted 097 // or when this cache is stopped. This is needed as some producers requires to 098 // be stopped so they can shutdown internal resources that otherwise may cause leaks 099 return new LRUCache<String, Producer>(cacheSize); 100 } 101 102 public CamelContext getCamelContext() { 103 return camelContext; 104 } 105 106 /** 107 * Gets the source which uses this cache 108 * 109 * @return the source 110 */ 111 public Object getSource() { 112 return source; 113 } 114 115 /** 116 * Acquires a pooled producer which you <b>must</b> release back again after usage using the 117 * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method. 118 * 119 * @param endpoint the endpoint 120 * @return the producer 121 */ 122 public Producer acquireProducer(Endpoint endpoint) { 123 return doGetProducer(endpoint, true); 124 } 125 126 /** 127 * Releases an acquired producer back after usage. 128 * 129 * @param endpoint the endpoint 130 * @param producer the producer to release 131 * @throws Exception can be thrown if error stopping producer if that was needed. 132 */ 133 public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { 134 if (producer instanceof ServicePoolAware) { 135 // release back to the pool 136 pool.release(endpoint, producer); 137 } else if (!producer.isSingleton()) { 138 // stop and shutdown non-singleton producers as we should not leak resources 139 ServiceHelper.stopAndShutdownService(producer); 140 } 141 } 142 143 /** 144 * Starts the {@link Producer} to be used for sending to the given endpoint 145 * <p/> 146 * This can be used to early start the {@link Producer} to ensure it can be created, 147 * such as when Camel is started. This allows to fail fast in case the {@link Producer} 148 * could not be started. 149 * 150 * @param endpoint the endpoint to send the exchange to 151 * @throws Exception is thrown if failed to create or start the {@link Producer} 152 */ 153 public void startProducer(Endpoint endpoint) throws Exception { 154 Producer producer = acquireProducer(endpoint); 155 releaseProducer(endpoint, producer); 156 } 157 158 /** 159 * Sends the exchange to the given endpoint. 160 * <p> 161 * This method will <b>not</b> throw an exception. If processing of the given 162 * Exchange failed then the exception is stored on the provided Exchange 163 * 164 * @param endpoint the endpoint to send the exchange to 165 * @param exchange the exchange to send 166 */ 167 public void send(Endpoint endpoint, Exchange exchange) { 168 sendExchange(endpoint, null, null, exchange); 169 } 170 171 /** 172 * Sends an exchange to an endpoint using a supplied 173 * {@link Processor} to populate the exchange 174 * <p> 175 * This method will <b>not</b> throw an exception. If processing of the given 176 * Exchange failed then the exception is stored on the return Exchange 177 * 178 * @param endpoint the endpoint to send the exchange to 179 * @param processor the transformer used to populate the new exchange 180 * @throws org.apache.camel.CamelExecutionException is thrown if sending failed 181 * @return the exchange 182 */ 183 public Exchange send(Endpoint endpoint, Processor processor) { 184 return sendExchange(endpoint, null, processor, null); 185 } 186 187 /** 188 * Sends an exchange to an endpoint using a supplied 189 * {@link Processor} to populate the exchange 190 * <p> 191 * This method will <b>not</b> throw an exception. If processing of the given 192 * Exchange failed then the exception is stored on the return Exchange 193 * 194 * @param endpoint the endpoint to send the exchange to 195 * @param pattern the message {@link ExchangePattern} such as 196 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 197 * @param processor the transformer used to populate the new exchange 198 * @return the exchange 199 */ 200 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 201 return sendExchange(endpoint, pattern, processor, null); 202 } 203 204 /** 205 * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. 206 * <p/> 207 * If an exception was thrown during processing, it would be set on the given Exchange 208 * 209 * @param endpoint the endpoint to send the exchange to 210 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 211 * @param pattern the exchange pattern, can be <tt>null</tt> 212 * @param callback the callback 213 * @return the response from the callback 214 * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback) 215 */ 216 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) { 217 T answer = null; 218 219 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 220 Producer producer = doGetProducer(endpoint, true); 221 222 if (producer == null) { 223 if (isStopped()) { 224 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 225 return null; 226 } else { 227 throw new IllegalStateException("No producer, this processor has not been started: " + this); 228 } 229 } 230 231 StopWatch watch = null; 232 if (eventNotifierEnabled && exchange != null) { 233 // record timing for sending the exchange using the producer 234 watch = new StopWatch(); 235 } 236 237 try { 238 if (eventNotifierEnabled && exchange != null) { 239 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 240 } 241 // invoke the callback 242 answer = callback.doInProducer(producer, exchange, pattern); 243 } catch (Throwable e) { 244 if (exchange != null) { 245 exchange.setException(e); 246 } 247 } finally { 248 if (eventNotifierEnabled && exchange != null) { 249 long timeTaken = watch.stop(); 250 // emit event that the exchange was sent to the endpoint 251 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 252 } 253 if (producer instanceof ServicePoolAware) { 254 // release back to the pool 255 pool.release(endpoint, producer); 256 } else if (!producer.isSingleton()) { 257 // stop and shutdown non-singleton producers as we should not leak resources 258 try { 259 ServiceHelper.stopAndShutdownService(producer); 260 } catch (Exception e) { 261 // ignore and continue 262 LOG.warn("Error stopping/shutting down producer: " + producer, e); 263 } 264 } 265 } 266 267 return answer; 268 } 269 270 /** 271 * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine. 272 * <p/> 273 * If an exception was thrown during processing, it would be set on the given Exchange 274 * 275 * @param endpoint the endpoint to send the exchange to 276 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 277 * @param pattern the exchange pattern, can be <tt>null</tt> 278 * @param callback the asynchronous callback 279 * @param producerCallback the producer template callback to be executed 280 * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 281 */ 282 public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, 283 final AsyncCallback callback, final AsyncProducerCallback producerCallback) { 284 boolean sync = true; 285 286 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 287 final Producer producer = doGetProducer(endpoint, true); 288 289 if (producer == null) { 290 if (isStopped()) { 291 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 292 return false; 293 } else { 294 throw new IllegalStateException("No producer, this processor has not been started: " + this); 295 } 296 } 297 298 // record timing for sending the exchange using the producer 299 final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null; 300 301 try { 302 if (eventNotifierEnabled && exchange != null) { 303 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 304 } 305 // invoke the callback 306 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); 307 sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { 308 @Override 309 public void done(boolean doneSync) { 310 try { 311 if (eventNotifierEnabled && watch != null) { 312 long timeTaken = watch.stop(); 313 // emit event that the exchange was sent to the endpoint 314 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 315 } 316 317 if (producer instanceof ServicePoolAware) { 318 // release back to the pool 319 pool.release(endpoint, producer); 320 } else if (!producer.isSingleton()) { 321 // stop and shutdown non-singleton producers as we should not leak resources 322 try { 323 ServiceHelper.stopAndShutdownService(producer); 324 } catch (Exception e) { 325 // ignore and continue 326 LOG.warn("Error stopping/shutting down producer: " + producer, e); 327 } 328 } 329 } finally { 330 callback.done(doneSync); 331 } 332 } 333 }); 334 } catch (Throwable e) { 335 // ensure exceptions is caught and set on the exchange 336 if (exchange != null) { 337 exchange.setException(e); 338 } 339 } 340 341 return sync; 342 } 343 344 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 345 final Processor processor, Exchange exchange) { 346 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 347 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { 348 if (exchange == null) { 349 exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange(); 350 } 351 352 if (processor != null) { 353 // lets populate using the processor callback 354 try { 355 processor.process(exchange); 356 } catch (Exception e) { 357 // populate failed so return 358 exchange.setException(e); 359 return exchange; 360 } 361 } 362 363 // now lets dispatch 364 LOG.debug(">>>> {} {}", endpoint, exchange); 365 366 // set property which endpoint we send to 367 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 368 369 // send the exchange using the processor 370 StopWatch watch = null; 371 try { 372 if (eventNotifierEnabled) { 373 watch = new StopWatch(); 374 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 375 } 376 // ensure we run in an unit of work 377 Producer target = new UnitOfWorkProducer(producer); 378 target.process(exchange); 379 } catch (Throwable e) { 380 // ensure exceptions is caught and set on the exchange 381 exchange.setException(e); 382 } finally { 383 // emit event that the exchange was sent to the endpoint 384 if (eventNotifierEnabled && watch != null) { 385 long timeTaken = watch.stop(); 386 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 387 } 388 } 389 return exchange; 390 } 391 }); 392 } 393 394 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 395 String key = endpoint.getEndpointUri(); 396 Producer answer = producers.get(key); 397 if (pooled && answer == null) { 398 // try acquire from connection pool 399 answer = pool.acquire(endpoint); 400 } 401 402 if (answer == null) { 403 // create a new producer 404 try { 405 answer = endpoint.createProducer(); 406 // must then start service so producer is ready to be used 407 ServiceHelper.startService(answer); 408 } catch (Exception e) { 409 throw new FailedToCreateProducerException(endpoint, e); 410 } 411 412 // add producer to cache or pool if applicable 413 if (pooled && answer instanceof ServicePoolAware) { 414 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 415 answer = pool.addAndAcquire(endpoint, answer); 416 } else if (answer.isSingleton()) { 417 LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer); 418 producers.put(key, answer); 419 } 420 } 421 422 return answer; 423 } 424 425 protected void doStart() throws Exception { 426 ServiceHelper.startServices(producers.values()); 427 ServiceHelper.startServices(pool); 428 } 429 430 protected void doStop() throws Exception { 431 // when stopping we intend to shutdown 432 ServiceHelper.stopAndShutdownService(pool); 433 ServiceHelper.stopAndShutdownServices(producers.values()); 434 producers.clear(); 435 } 436 437 /** 438 * Returns the current size of the cache 439 * 440 * @return the current size 441 */ 442 public int size() { 443 int size = producers.size(); 444 size += pool.size(); 445 446 LOG.trace("size = {}", size); 447 return size; 448 } 449 450 /** 451 * Gets the maximum cache size (capacity). 452 * <p/> 453 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 454 * 455 * @return the capacity 456 */ 457 public int getCapacity() { 458 int capacity = -1; 459 if (producers instanceof LRUCache) { 460 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 461 capacity = cache.getMaxCacheSize(); 462 } 463 return capacity; 464 } 465 466 /** 467 * Gets the cache hits statistic 468 * <p/> 469 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 470 * 471 * @return the hits 472 */ 473 public long getHits() { 474 long hits = -1; 475 if (producers instanceof LRUCache) { 476 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 477 hits = cache.getHits(); 478 } 479 return hits; 480 } 481 482 /** 483 * Gets the cache misses statistic 484 * <p/> 485 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 486 * 487 * @return the misses 488 */ 489 public long getMisses() { 490 long misses = -1; 491 if (producers instanceof LRUCache) { 492 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 493 misses = cache.getMisses(); 494 } 495 return misses; 496 } 497 498 /** 499 * Gets the cache evicted statistic 500 * <p/> 501 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 502 * 503 * @return the evicted 504 */ 505 public long getEvicted() { 506 long evicted = -1; 507 if (producers instanceof LRUCache) { 508 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 509 evicted = cache.getEvicted(); 510 } 511 return evicted; 512 } 513 514 /** 515 * Resets the cache statistics 516 */ 517 public void resetCacheStatistics() { 518 if (producers instanceof LRUCache) { 519 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 520 cache.resetStatistics(); 521 } 522 } 523 524 /** 525 * Purges this cache 526 */ 527 public synchronized void purge() { 528 producers.clear(); 529 pool.purge(); 530 } 531 532 @Override 533 public String toString() { 534 return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); 535 } 536 }