001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    import java.util.concurrent.Callable;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.Future;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.TimeoutException;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.CamelExecutionException;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.ExchangePattern;
031    import org.apache.camel.Message;
032    import org.apache.camel.NoSuchEndpointException;
033    import org.apache.camel.Processor;
034    import org.apache.camel.ProducerTemplate;
035    import org.apache.camel.spi.Synchronization;
036    import org.apache.camel.support.ServiceSupport;
037    import org.apache.camel.util.CamelContextHelper;
038    import org.apache.camel.util.ExchangeHelper;
039    import org.apache.camel.util.ObjectHelper;
040    import org.apache.camel.util.ServiceHelper;
041    
042    /**
043     * Template (named like Spring's TransactionTemplate & JmsTemplate
044     * et al) for working with Camel and sending {@link Message} instances in an
045     * {@link Exchange} to an {@link Endpoint}.
046     *
047     * @version 
048     */
049    public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
050        private final CamelContext camelContext;
051        private volatile ProducerCache producerCache;
052        private volatile ExecutorService executor;
053        private Endpoint defaultEndpoint;
054        private int maximumCacheSize;
055        private boolean eventNotifierEnabled = true;
056    
057        public DefaultProducerTemplate(CamelContext camelContext) {
058            this.camelContext = camelContext;
059        }
060    
061        public DefaultProducerTemplate(CamelContext camelContext, ExecutorService executor) {
062            this.camelContext = camelContext;
063            this.executor = executor;
064        }
065    
066        public DefaultProducerTemplate(CamelContext camelContext, Endpoint defaultEndpoint) {
067            this(camelContext);
068            this.defaultEndpoint = defaultEndpoint;
069        }
070    
071        public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
072            Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
073            return new DefaultProducerTemplate(camelContext, endpoint);
074        }
075    
076        public int getMaximumCacheSize() {
077            return maximumCacheSize;
078        }
079    
080        public void setMaximumCacheSize(int maximumCacheSize) {
081            this.maximumCacheSize = maximumCacheSize;
082        }
083    
084        public int getCurrentCacheSize() {
085            if (producerCache == null) {
086                return 0;
087            }
088            return producerCache.size();
089        }
090    
091        public boolean isEventNotifierEnabled() {
092            return eventNotifierEnabled;
093        }
094    
095        public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
096            this.eventNotifierEnabled = eventNotifierEnabled;
097            // if we already created the cache then adjust its setting as well
098            if (producerCache != null) {
099                producerCache.setEventNotifierEnabled(eventNotifierEnabled);
100            }
101        }
102    
103        public Exchange send(String endpointUri, Exchange exchange) {
104            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
105            return send(endpoint, exchange);
106        }
107    
108        public Exchange send(String endpointUri, Processor processor) {
109            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
110            return send(endpoint, processor);
111        }
112    
113        public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
114            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
115            return send(endpoint, pattern, processor);
116        }
117    
118        public Exchange send(Endpoint endpoint, Exchange exchange) {
119            getProducerCache().send(endpoint, exchange);
120            return exchange;
121        }
122    
123        public Exchange send(Endpoint endpoint, Processor processor) {
124            return getProducerCache().send(endpoint, processor);
125        }
126    
127        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
128            return getProducerCache().send(endpoint, pattern, processor);
129        }
130    
131        public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
132            Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
133            return extractResultBody(result, pattern);
134        }
135    
136        public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException {
137            Exchange result = send(endpoint, createSetBodyProcessor(body));
138            // must invoke extract result body in case of exception to be rethrown
139            extractResultBody(result);
140        }
141    
142        public void sendBody(String endpointUri, Object body) throws CamelExecutionException {
143            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
144            sendBody(endpoint, body);
145        }
146    
147        public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException {
148            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
149            Object result = sendBody(endpoint, pattern, body);
150            if (pattern.isOutCapable()) {
151                return result;
152            } else {
153                // return null if not OUT capable
154                return null;
155            }
156        }
157    
158        public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
159            sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
160        }
161    
162        public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
163            Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
164            // must invoke extract result body in case of exception to be rethrown
165            extractResultBody(result);
166        }
167    
168        public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
169                                        final String header, final Object headerValue) throws CamelExecutionException {
170            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
171            Object result = extractResultBody(exchange, pattern);
172            if (pattern.isOutCapable()) {
173                return result;
174            } else {
175                // return null if not OUT capable
176                return null;
177            }
178        }
179    
180        public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
181                                        final String header, final Object headerValue) throws CamelExecutionException {
182            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
183            Object result = extractResultBody(exchange, pattern);
184            if (pattern.isOutCapable()) {
185                return result;
186            } else {
187                // return null if not OUT capable
188                return null;
189            }
190        }
191    
192        public void sendBodyAndProperty(String endpointUri, final Object body,
193                                        final String property, final Object propertyValue) throws CamelExecutionException {
194            sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
195        }
196    
197        public void sendBodyAndProperty(Endpoint endpoint, final Object body,
198                                        final String property, final Object propertyValue) throws CamelExecutionException {
199            Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
200            // must invoke extract result body in case of exception to be rethrown
201            extractResultBody(result);
202        }
203    
204        public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
205                                          final String property, final Object propertyValue) throws CamelExecutionException {
206            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
207            Object result = extractResultBody(exchange, pattern);
208            if (pattern.isOutCapable()) {
209                return result;
210            } else {
211                // return null if not OUT capable
212                return null;
213            }
214        }
215    
216        public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
217                                          final String property, final Object propertyValue) throws CamelExecutionException {
218            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
219            Object result = extractResultBody(exchange, pattern);
220            if (pattern.isOutCapable()) {
221                return result;
222            } else {
223                // return null if not OUT capable
224                return null;
225            }
226        }
227    
228        public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
229            sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
230        }
231    
232        public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
233            Exchange result = send(endpoint, new Processor() {
234                public void process(Exchange exchange) {
235                    Message in = exchange.getIn();
236                    for (Map.Entry<String, Object> header : headers.entrySet()) {
237                        in.setHeader(header.getKey(), header.getValue());
238                    }
239                    in.setBody(body);
240                }
241            });
242            // must invoke extract result body in case of exception to be rethrown
243            extractResultBody(result);
244        }
245    
246        public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) throws CamelExecutionException {
247            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
248        }
249    
250        public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
251            Exchange exchange = send(endpoint, pattern, new Processor() {
252                public void process(Exchange exchange) throws Exception {
253                    Message in = exchange.getIn();
254                    for (Map.Entry<String, Object> header : headers.entrySet()) {
255                        in.setHeader(header.getKey(), header.getValue());
256                    }
257                    in.setBody(body);
258                }
259            });
260            Object result = extractResultBody(exchange, pattern);
261            if (pattern.isOutCapable()) {
262                return result;
263            } else {
264                // return null if not OUT capable
265                return null;
266            }
267        }
268    
269        // Methods using an InOut ExchangePattern
270        // -----------------------------------------------------------------------
271    
272        public Exchange request(Endpoint endpoint, Processor processor) {
273            return send(endpoint, ExchangePattern.InOut, processor);
274        }
275    
276        public Object requestBody(Object body) throws CamelExecutionException {
277            return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
278        }
279    
280        public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException {
281            return sendBody(endpoint, ExchangePattern.InOut, body);
282        }
283    
284        public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException {
285            return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue);
286        }
287    
288        public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
289            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
290        }
291    
292        public Exchange request(String endpoint, Processor processor) throws CamelExecutionException {
293            return send(endpoint, ExchangePattern.InOut, processor);
294        }
295    
296        public Object requestBody(String endpoint, Object body) throws CamelExecutionException {
297            return sendBody(endpoint, ExchangePattern.InOut, body);
298        }
299    
300        public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
301            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
302        }
303    
304        public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
305            return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
306        }
307    
308        public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
309            return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
310        }
311    
312        public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) {
313            return sendBodyAndHeaders(getDefaultEndpoint(), ExchangePattern.InOut, body, headers);
314        }
315    
316        public <T> T requestBody(Object body, Class<T> type) {
317            Object answer = requestBody(body);
318            return camelContext.getTypeConverter().convertTo(type, answer);
319        }
320    
321        public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
322            Object answer = requestBody(endpoint, body);
323            return camelContext.getTypeConverter().convertTo(type, answer);
324        }
325    
326        public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
327            Object answer = requestBody(endpointUri, body);
328            return camelContext.getTypeConverter().convertTo(type, answer);
329        }
330    
331        public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
332            Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
333            return camelContext.getTypeConverter().convertTo(type, answer);
334        }
335    
336        public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
337            Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
338            return camelContext.getTypeConverter().convertTo(type, answer);
339        }
340    
341        public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
342            Object answer = requestBodyAndHeaders(endpointUri, body, headers);
343            return camelContext.getTypeConverter().convertTo(type, answer);
344        }
345    
346        public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
347            Object answer = requestBodyAndHeaders(endpoint, body, headers);
348            return camelContext.getTypeConverter().convertTo(type, answer);
349        }
350    
351        // Methods using the default endpoint
352        // -----------------------------------------------------------------------
353    
354        public void sendBody(Object body) {
355            sendBody(getMandatoryDefaultEndpoint(), body);
356        }
357    
358        public Exchange send(Exchange exchange) {
359            return send(getMandatoryDefaultEndpoint(), exchange);
360        }
361    
362        public Exchange send(Processor processor) {
363            return send(getMandatoryDefaultEndpoint(), processor);
364        }
365    
366        public void sendBodyAndHeader(Object body, String header, Object headerValue) {
367            sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
368        }
369    
370        public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
371            sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
372        }
373    
374        public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
375            sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
376        }
377    
378        // Properties
379        // -----------------------------------------------------------------------
380    
381        /**
382         * @deprecated use {@link #getCamelContext()}
383         */
384        @Deprecated
385        public CamelContext getContext() {
386            return getCamelContext();
387        }
388    
389        public CamelContext getCamelContext() {
390            return camelContext;
391        }
392    
393        public Endpoint getDefaultEndpoint() {
394            return defaultEndpoint;
395        }
396    
397        public void setDefaultEndpoint(Endpoint defaultEndpoint) {
398            this.defaultEndpoint = defaultEndpoint;
399        }
400    
401        /**
402         * Sets the default endpoint to use if none is specified
403         */
404        public void setDefaultEndpointUri(String endpointUri) {
405            setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
406        }
407    
408        /**
409         * @deprecated use {@link CamelContext#getEndpoint(String, Class)}
410         */
411        @Deprecated
412        public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
413            return camelContext.getEndpoint(endpointUri, expectedClass);
414        }
415    
416        // Implementation methods
417        // -----------------------------------------------------------------------
418    
419        protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
420            return new Processor() {
421                public void process(Exchange exchange) {
422                    Message in = exchange.getIn();
423                    in.setHeader(header, headerValue);
424                    in.setBody(body);
425                }
426            };
427        }
428    
429        protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
430            return new Processor() {
431                public void process(Exchange exchange) {
432                    exchange.setProperty(property, propertyValue);
433                    Message in = exchange.getIn();
434                    in.setBody(body);
435                }
436            };
437        }
438    
439        protected Processor createSetBodyProcessor(final Object body) {
440            return new Processor() {
441                public void process(Exchange exchange) {
442                    Message in = exchange.getIn();
443                    in.setBody(body);
444                }
445            };
446        }
447    
448        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
449            Endpoint endpoint = camelContext.getEndpoint(endpointUri);
450            if (endpoint == null) {
451                throw new NoSuchEndpointException(endpointUri);
452            }
453            return endpoint;
454        }
455    
456        protected Endpoint getMandatoryDefaultEndpoint() {
457            Endpoint answer = getDefaultEndpoint();
458            ObjectHelper.notNull(answer, "defaultEndpoint");
459            return answer;
460        }
461    
462        protected Object extractResultBody(Exchange result) {
463            return extractResultBody(result, null);
464        }
465    
466        protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
467            return ExchangeHelper.extractResultBody(result, pattern);
468        }
469    
470        public void setExecutorService(ExecutorService executorService) {
471            this.executor = executorService;
472        }
473    
474        public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
475            return asyncSend(resolveMandatoryEndpoint(uri), exchange);
476        }
477    
478        public Future<Exchange> asyncSend(final String uri, final Processor processor) {
479            return asyncSend(resolveMandatoryEndpoint(uri), processor);
480        }
481    
482        public Future<Object> asyncSendBody(final String uri, final Object body) {
483            return asyncSendBody(resolveMandatoryEndpoint(uri), body);
484        }
485    
486        public Future<Object> asyncRequestBody(final String uri, final Object body) {
487            return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
488        }
489    
490        public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
491            return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
492        }
493    
494        public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
495            return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
496        }
497    
498        public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
499            return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
500        }
501    
502        public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
503            return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
504        }
505    
506        public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
507            return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
508        }
509    
510        public <T> T extractFutureBody(Future<Object> future, Class<T> type) {
511            return ExchangeHelper.extractFutureBody(camelContext, future, type);
512        }
513    
514        public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
515            return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type);
516        }
517    
518        public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) {
519            return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
520        }
521    
522        public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
523            return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
524        }
525    
526        public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) {
527            return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
528        }
529    
530        public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
531            return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
532        }
533    
534        public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) {
535            return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
536        }
537    
538        public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) {
539            return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
540        }
541    
542        public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
543            Callable<Object> task = new Callable<Object>() {
544                public Object call() throws Exception {
545                    return requestBody(endpoint, body);
546                }
547            };
548            return getExecutorService().submit(task);
549        }
550    
551        public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) {
552            Callable<T> task = new Callable<T>() {
553                public T call() throws Exception {
554                    return requestBody(endpoint, body, type);
555                }
556            };
557            return getExecutorService().submit(task);
558        }
559    
560        public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
561                                                        final Object headerValue) {
562            Callable<Object> task = new Callable<Object>() {
563                public Object call() throws Exception {
564                    return requestBodyAndHeader(endpoint, body, header, headerValue);
565                }
566            };
567            return getExecutorService().submit(task);
568        }
569    
570        public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
571                                                       final Object headerValue, final Class<T> type) {
572            Callable<T> task = new Callable<T>() {
573                public T call() throws Exception {
574                    return requestBodyAndHeader(endpoint, body, header, headerValue, type);
575                }
576            };
577            return getExecutorService().submit(task);
578        }
579    
580        public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
581                                                         final Map<String, Object> headers) {
582            Callable<Object> task = new Callable<Object>() {
583                public Object call() throws Exception {
584                    return requestBodyAndHeaders(endpoint, body, headers);
585                }
586            };
587            return getExecutorService().submit(task);
588        }
589    
590        public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
591                                                        final Map<String, Object> headers, final Class<T> type) {
592            Callable<T> task = new Callable<T>() {
593                public T call() throws Exception {
594                    return requestBodyAndHeaders(endpoint, body, headers, type);
595                }
596            };
597            return getExecutorService().submit(task);
598        }
599    
600        public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
601            Callable<Exchange> task = new Callable<Exchange>() {
602                public Exchange call() throws Exception {
603                    return send(endpoint, exchange);
604                }
605            };
606            return getExecutorService().submit(task);
607        }
608    
609        public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
610            Callable<Exchange> task = new Callable<Exchange>() {
611                public Exchange call() throws Exception {
612                    return send(endpoint, processor);
613                }
614            };
615            return getExecutorService().submit(task);
616        }
617    
618        public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
619            Callable<Object> task = new Callable<Object>() {
620                public Object call() throws Exception {
621                    sendBody(endpoint, body);
622                    // its InOnly, so no body to return
623                    return null;
624                }
625            };
626            return getExecutorService().submit(task);
627        }
628    
629        private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) {
630            Callable<Object> task = new Callable<Object>() {
631                public Object call() throws Exception {
632                    Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body));
633    
634                    // invoke callback before returning answer
635                    // as it allows callback to be used without unit of work invoking it
636                    // and thus it works directly from a producer template as well, as opposed
637                    // to the unit of work that is injected in routes
638                    if (answer.isFailed()) {
639                        onCompletion.onFailure(answer);
640                    } else {
641                        onCompletion.onComplete(answer);
642                    }
643    
644                    Object result = extractResultBody(answer, pattern);
645                    if (pattern.isOutCapable()) {
646                        return result;
647                    } else {
648                        // return null if not OUT capable
649                        return null;
650                    }
651                }
652            };
653            return getExecutorService().submit(task);
654        }
655    
656        public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) {
657            Callable<Exchange> task = new Callable<Exchange>() {
658                public Exchange call() throws Exception {
659                    // process the exchange, any exception occurring will be caught and set on the exchange
660                    send(endpoint, exchange);
661    
662                    // invoke callback before returning answer
663                    // as it allows callback to be used without unit of work invoking it
664                    // and thus it works directly from a producer template as well, as opposed
665                    // to the unit of work that is injected in routes
666                    if (exchange.isFailed()) {
667                        onCompletion.onFailure(exchange);
668                    } else {
669                        onCompletion.onComplete(exchange);
670                    }
671                    return exchange;
672                }
673            };
674            return getExecutorService().submit(task);
675        }
676    
677        public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
678            Callable<Exchange> task = new Callable<Exchange>() {
679                public Exchange call() throws Exception {
680                    // process the exchange, any exception occurring will be caught and set on the exchange
681                    Exchange answer = send(endpoint, processor);
682    
683                    // invoke callback before returning answer
684                    // as it allows callback to be used without unit of work invoking it
685                    // and thus it works directly from a producer template as well, as opposed
686                    // to the unit of work that is injected in routes
687                    if (answer.isFailed()) {
688                        onCompletion.onFailure(answer);
689                    } else {
690                        onCompletion.onComplete(answer);
691                    }
692                    return answer;
693                }
694            };
695            return getExecutorService().submit(task);
696        }
697    
698        private ProducerCache getProducerCache() {
699            if (!isStarted()) {
700                throw new IllegalStateException("ProducerTemplate has not been started");
701            }
702            return producerCache;
703        }
704    
705        private ExecutorService getExecutorService() {
706            if (!isStarted()) {
707                throw new IllegalStateException("ProducerTemplate has not been started");
708            }
709    
710            if (executor != null) {
711                return executor;
712            }
713    
714            // create a default executor which must be synchronized
715            synchronized (this) {
716                if (executor != null) {
717                    return executor;
718                }
719                executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate");
720            }
721    
722            ObjectHelper.notNull(executor, "ExecutorService");
723            return executor;
724        }
725    
726        protected void doStart() throws Exception {
727            if (producerCache == null) {
728                if (maximumCacheSize > 0) {
729                    producerCache = new ProducerCache(this, camelContext, maximumCacheSize);
730                } else {
731                    producerCache = new ProducerCache(this, camelContext);
732                }
733                producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
734            }
735            ServiceHelper.startService(producerCache);
736        }
737    
738        protected void doStop() throws Exception {
739            ServiceHelper.stopService(producerCache);
740            producerCache = null;
741    
742            if (executor != null) {
743                camelContext.getExecutorServiceManager().shutdownNow(executor);
744                executor = null;
745            }
746        }
747    
748    }