001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.FailedToCreateProducerException;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.camel.ProducerCallback;
032    import org.apache.camel.ServicePoolAware;
033    import org.apache.camel.processor.UnitOfWorkProducer;
034    import org.apache.camel.spi.ServicePool;
035    import org.apache.camel.support.ServiceSupport;
036    import org.apache.camel.util.AsyncProcessorConverterHelper;
037    import org.apache.camel.util.CamelContextHelper;
038    import org.apache.camel.util.EventHelper;
039    import org.apache.camel.util.LRUCache;
040    import org.apache.camel.util.ServiceHelper;
041    import org.apache.camel.util.StopWatch;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * Cache containing created {@link Producer}.
047     *
048     * @version 
049     */
050    public class ProducerCache extends ServiceSupport {
051        private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class);
052    
053        private final CamelContext camelContext;
054        private final ServicePool<Endpoint, Producer> pool;
055        private final Map<String, Producer> producers;
056        private final Object source;
057        private boolean eventNotifierEnabled = true;
058    
059        public ProducerCache(Object source, CamelContext camelContext) {
060            this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
061        }
062    
063        public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
064            this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
065        }
066    
067        public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
068            this(source, camelContext, camelContext.getProducerServicePool(), cache);
069        }
070    
071        public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
072            this.source = source;
073            this.camelContext = camelContext;
074            this.pool = producerServicePool;
075            this.producers = cache;
076        }
077    
078        public boolean isEventNotifierEnabled() {
079            return eventNotifierEnabled;
080        }
081    
082        public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
083            this.eventNotifierEnabled = eventNotifierEnabled;
084        }
085    
086        /**
087         * Creates the {@link LRUCache} to be used.
088         * <p/>
089         * This implementation returns a {@link LRUCache} instance.
090    
091         * @param cacheSize the cache size
092         * @return the cache
093         */
094        protected static LRUCache<String, Producer> createLRUCache(int cacheSize) {
095            // Use a regular cache as we want to ensure that the lifecycle of the producers
096            // being cache is properly handled, such as they are stopped when being evicted
097            // or when this cache is stopped. This is needed as some producers requires to
098            // be stopped so they can shutdown internal resources that otherwise may cause leaks
099            return new LRUCache<String, Producer>(cacheSize);
100        }
101    
102        public CamelContext getCamelContext() {
103            return camelContext;
104        }
105    
106        /**
107         * Gets the source which uses this cache
108         *
109         * @return the source
110         */
111        public Object getSource() {
112            return source;
113        }
114    
115        /**
116         * Acquires a pooled producer which you <b>must</b> release back again after usage using the
117         * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method.
118         *
119         * @param endpoint the endpoint
120         * @return the producer
121         */
122        public Producer acquireProducer(Endpoint endpoint) {
123            return doGetProducer(endpoint, true);
124        }
125    
126        /**
127         * Releases an acquired producer back after usage.
128         *
129         * @param endpoint the endpoint
130         * @param producer the producer to release
131         * @throws Exception can be thrown if error stopping producer if that was needed.
132         */
133        public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
134            if (producer instanceof ServicePoolAware) {
135                // release back to the pool
136                pool.release(endpoint, producer);
137            } else if (!producer.isSingleton()) {
138                // stop and shutdown non-singleton producers as we should not leak resources
139                ServiceHelper.stopAndShutdownService(producer);
140            }
141        }
142    
143        /**
144         * Starts the {@link Producer} to be used for sending to the given endpoint
145         * <p/>
146         * This can be used to early start the {@link Producer} to ensure it can be created,
147         * such as when Camel is started. This allows to fail fast in case the {@link Producer}
148         * could not be started.
149         *
150         * @param endpoint the endpoint to send the exchange to
151         * @throws Exception is thrown if failed to create or start the {@link Producer}
152         */
153        public void startProducer(Endpoint endpoint) throws Exception {
154            Producer producer = acquireProducer(endpoint);
155            releaseProducer(endpoint, producer);
156        }
157    
158        /**
159         * Sends the exchange to the given endpoint.
160         * <p>
161         * This method will <b>not</b> throw an exception. If processing of the given
162         * Exchange failed then the exception is stored on the provided Exchange
163         *
164         * @param endpoint the endpoint to send the exchange to
165         * @param exchange the exchange to send
166         */
167        public void send(Endpoint endpoint, Exchange exchange) {
168            sendExchange(endpoint, null, null, exchange);
169        }
170    
171        /**
172         * Sends an exchange to an endpoint using a supplied
173         * {@link Processor} to populate the exchange
174         * <p>
175         * This method will <b>not</b> throw an exception. If processing of the given
176         * Exchange failed then the exception is stored on the return Exchange
177         *
178         * @param endpoint the endpoint to send the exchange to
179         * @param processor the transformer used to populate the new exchange
180         * @throws org.apache.camel.CamelExecutionException is thrown if sending failed
181         * @return the exchange
182         */
183        public Exchange send(Endpoint endpoint, Processor processor) {
184            return sendExchange(endpoint, null, processor, null);
185        }
186    
187        /**
188         * Sends an exchange to an endpoint using a supplied
189         * {@link Processor} to populate the exchange
190         * <p>
191         * This method will <b>not</b> throw an exception. If processing of the given
192         * Exchange failed then the exception is stored on the return Exchange
193         *
194         * @param endpoint the endpoint to send the exchange to
195         * @param pattern the message {@link ExchangePattern} such as
196         *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
197         * @param processor the transformer used to populate the new exchange
198         * @return the exchange
199         */
200        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
201            return sendExchange(endpoint, pattern, processor, null);
202        }
203    
204        /**
205         * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
206         * <p/>
207         * If an exception was thrown during processing, it would be set on the given Exchange
208         *
209         * @param endpoint  the endpoint to send the exchange to
210         * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
211         * @param pattern   the exchange pattern, can be <tt>null</tt>
212         * @param callback  the callback
213         * @return the response from the callback
214         * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
215         */
216        public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
217            T answer = null;
218    
219            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
220            Producer producer = doGetProducer(endpoint, true);
221    
222            if (producer == null) {
223                if (isStopped()) {
224                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
225                    return null;
226                } else {
227                    throw new IllegalStateException("No producer, this processor has not been started: " + this);
228                }
229            }
230    
231            StopWatch watch = null;
232            if (eventNotifierEnabled && exchange != null) {
233                // record timing for sending the exchange using the producer
234                watch = new StopWatch();
235            }
236    
237            try {
238                if (eventNotifierEnabled && exchange != null) {
239                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
240                }
241                // invoke the callback
242                answer = callback.doInProducer(producer, exchange, pattern);
243            } catch (Throwable e) {
244                if (exchange != null) {
245                    exchange.setException(e);
246                }
247            } finally {
248                if (eventNotifierEnabled && exchange != null) {
249                    long timeTaken = watch.stop();
250                    // emit event that the exchange was sent to the endpoint
251                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
252                }
253                if (producer instanceof ServicePoolAware) {
254                    // release back to the pool
255                    pool.release(endpoint, producer);
256                } else if (!producer.isSingleton()) {
257                    // stop and shutdown non-singleton producers as we should not leak resources
258                    try {
259                        ServiceHelper.stopAndShutdownService(producer);
260                    } catch (Exception e) {
261                        // ignore and continue
262                        LOG.warn("Error stopping/shutting down producer: " + producer, e);
263                    }
264                }
265            }
266    
267            return answer;
268        }
269    
270        /**
271         * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine.
272         * <p/>
273         * If an exception was thrown during processing, it would be set on the given Exchange
274         *
275         * @param endpoint         the endpoint to send the exchange to
276         * @param exchange         the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
277         * @param pattern          the exchange pattern, can be <tt>null</tt>
278         * @param callback         the asynchronous callback
279         * @param producerCallback the producer template callback to be executed
280         * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
281         */
282        public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
283                                         final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
284            boolean sync = true;
285    
286            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
287            final Producer producer = doGetProducer(endpoint, true);
288    
289            if (producer == null) {
290                if (isStopped()) {
291                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
292                    return false;
293                } else {
294                    throw new IllegalStateException("No producer, this processor has not been started: " + this);
295                }
296            }
297    
298            // record timing for sending the exchange using the producer
299            final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null;
300    
301            try {
302                if (eventNotifierEnabled && exchange != null) {
303                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
304                }
305                // invoke the callback
306                AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
307                sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() {
308                    @Override
309                    public void done(boolean doneSync) {
310                        try {
311                            if (eventNotifierEnabled && watch != null) {
312                                long timeTaken = watch.stop();
313                                // emit event that the exchange was sent to the endpoint
314                                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
315                            }
316    
317                            if (producer instanceof ServicePoolAware) {
318                                // release back to the pool
319                                pool.release(endpoint, producer);
320                            } else if (!producer.isSingleton()) {
321                                // stop and shutdown non-singleton producers as we should not leak resources
322                                try {
323                                    ServiceHelper.stopAndShutdownService(producer);
324                                } catch (Exception e) {
325                                    // ignore and continue
326                                    LOG.warn("Error stopping/shutting down producer: " + producer, e);
327                                }
328                            }
329                        } finally {
330                            callback.done(doneSync);
331                        }
332                    }
333                });
334            } catch (Throwable e) {
335                // ensure exceptions is caught and set on the exchange
336                if (exchange != null) {
337                    exchange.setException(e);
338                }
339            }
340    
341            return sync;
342        }
343    
344        protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
345                                        final Processor processor, Exchange exchange) {
346            return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
347                public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
348                    if (exchange == null) {
349                        exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
350                    }
351    
352                    if (processor != null) {
353                        // lets populate using the processor callback
354                        try {
355                            processor.process(exchange);
356                        } catch (Exception e) {
357                            // populate failed so return
358                            exchange.setException(e);
359                            return exchange;
360                        }
361                    }
362    
363                    // now lets dispatch
364                    LOG.debug(">>>> {} {}", endpoint, exchange);
365    
366                    // set property which endpoint we send to
367                    exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
368    
369                    // send the exchange using the processor
370                    StopWatch watch = null;
371                    try {
372                        if (eventNotifierEnabled) {
373                            watch = new StopWatch();
374                            EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
375                        }
376                        // ensure we run in an unit of work
377                        Producer target = new UnitOfWorkProducer(producer);
378                        target.process(exchange);
379                    } catch (Throwable e) {
380                        // ensure exceptions is caught and set on the exchange
381                        exchange.setException(e);
382                    } finally {
383                        // emit event that the exchange was sent to the endpoint
384                        if (eventNotifierEnabled && watch != null) {
385                            long timeTaken = watch.stop();
386                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
387                        }
388                    }
389                    return exchange;
390                }
391            });
392        }
393    
394        protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
395            String key = endpoint.getEndpointUri();
396            Producer answer = producers.get(key);
397            if (pooled && answer == null) {
398                // try acquire from connection pool
399                answer = pool.acquire(endpoint);
400            }
401    
402            if (answer == null) {
403                // create a new producer
404                try {
405                    answer = endpoint.createProducer();
406                    // must then start service so producer is ready to be used
407                    ServiceHelper.startService(answer);
408                } catch (Exception e) {
409                    throw new FailedToCreateProducerException(endpoint, e);
410                }
411    
412                // add producer to cache or pool if applicable
413                if (pooled && answer instanceof ServicePoolAware) {
414                    LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
415                    answer = pool.addAndAcquire(endpoint, answer);
416                } else if (answer.isSingleton()) {
417                    LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
418                    producers.put(key, answer);
419                }
420            }
421    
422            return answer;
423        }
424    
425        protected void doStart() throws Exception {
426            ServiceHelper.startServices(producers.values());
427            ServiceHelper.startServices(pool);
428        }
429    
430        protected void doStop() throws Exception {
431            // when stopping we intend to shutdown
432            ServiceHelper.stopAndShutdownService(pool);
433            ServiceHelper.stopAndShutdownServices(producers.values());
434            producers.clear();
435        }
436    
437        /**
438         * Returns the current size of the cache
439         *
440         * @return the current size
441         */
442        public int size() {
443            int size = producers.size();
444            size += pool.size();
445    
446            LOG.trace("size = {}", size);
447            return size;
448        }
449    
450        /**
451         * Gets the maximum cache size (capacity).
452         * <p/>
453         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
454         *
455         * @return the capacity
456         */
457        public int getCapacity() {
458            int capacity = -1;
459            if (producers instanceof LRUCache) {
460                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
461                capacity = cache.getMaxCacheSize();
462            }
463            return capacity;
464        }
465    
466        /**
467         * Gets the cache hits statistic
468         * <p/>
469         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
470         *
471         * @return the hits
472         */
473        public long getHits() {
474            long hits = -1;
475            if (producers instanceof LRUCache) {
476                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
477                hits = cache.getHits();
478            }
479            return hits;
480        }
481    
482        /**
483         * Gets the cache misses statistic
484         * <p/>
485         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
486         *
487         * @return the misses
488         */
489        public long getMisses() {
490            long misses = -1;
491            if (producers instanceof LRUCache) {
492                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
493                misses = cache.getMisses();
494            }
495            return misses;
496        }
497    
498        /**
499         * Gets the cache evicted statistic
500         * <p/>
501         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
502         *
503         * @return the evicted
504         */
505        public long getEvicted() {
506            long evicted = -1;
507            if (producers instanceof LRUCache) {
508                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
509                evicted = cache.getEvicted();
510            }
511            return evicted;
512        }
513    
514        /**
515         * Resets the cache statistics
516         */
517        public void resetCacheStatistics() {
518            if (producers instanceof LRUCache) {
519                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
520                cache.resetStatistics();
521            }
522        }
523    
524        /**
525         * Purges this cache
526         */
527        public synchronized void purge() {
528            producers.clear();
529            pool.purge();
530        }
531    
532        @Override
533        public String toString() {
534            return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
535        }
536    }