001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 import java.util.concurrent.Callable; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.Future; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.TimeoutException; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.CamelExecutionException; 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.ExchangePattern; 031 import org.apache.camel.Message; 032 import org.apache.camel.NoSuchEndpointException; 033 import org.apache.camel.Processor; 034 import org.apache.camel.ProducerTemplate; 035 import org.apache.camel.spi.Synchronization; 036 import org.apache.camel.support.ServiceSupport; 037 import org.apache.camel.util.CamelContextHelper; 038 import org.apache.camel.util.ExchangeHelper; 039 import org.apache.camel.util.ObjectHelper; 040 import org.apache.camel.util.ServiceHelper; 041 042 /** 043 * Template (named like Spring's TransactionTemplate & JmsTemplate 044 * et al) for working with Camel and sending {@link Message} instances in an 045 * {@link Exchange} to an {@link Endpoint}. 046 * 047 * @version 048 */ 049 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate { 050 private final CamelContext camelContext; 051 private volatile ProducerCache producerCache; 052 private volatile ExecutorService executor; 053 private Endpoint defaultEndpoint; 054 private int maximumCacheSize; 055 private boolean eventNotifierEnabled = true; 056 057 public DefaultProducerTemplate(CamelContext camelContext) { 058 this.camelContext = camelContext; 059 } 060 061 public DefaultProducerTemplate(CamelContext camelContext, ExecutorService executor) { 062 this.camelContext = camelContext; 063 this.executor = executor; 064 } 065 066 public DefaultProducerTemplate(CamelContext camelContext, Endpoint defaultEndpoint) { 067 this(camelContext); 068 this.defaultEndpoint = defaultEndpoint; 069 } 070 071 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) { 072 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri); 073 return new DefaultProducerTemplate(camelContext, endpoint); 074 } 075 076 public int getMaximumCacheSize() { 077 return maximumCacheSize; 078 } 079 080 public void setMaximumCacheSize(int maximumCacheSize) { 081 this.maximumCacheSize = maximumCacheSize; 082 } 083 084 public int getCurrentCacheSize() { 085 if (producerCache == null) { 086 return 0; 087 } 088 return producerCache.size(); 089 } 090 091 public boolean isEventNotifierEnabled() { 092 return eventNotifierEnabled; 093 } 094 095 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 096 this.eventNotifierEnabled = eventNotifierEnabled; 097 // if we already created the cache then adjust its setting as well 098 if (producerCache != null) { 099 producerCache.setEventNotifierEnabled(eventNotifierEnabled); 100 } 101 } 102 103 public Exchange send(String endpointUri, Exchange exchange) { 104 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 105 return send(endpoint, exchange); 106 } 107 108 public Exchange send(String endpointUri, Processor processor) { 109 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 110 return send(endpoint, processor); 111 } 112 113 public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) { 114 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 115 return send(endpoint, pattern, processor); 116 } 117 118 public Exchange send(Endpoint endpoint, Exchange exchange) { 119 getProducerCache().send(endpoint, exchange); 120 return exchange; 121 } 122 123 public Exchange send(Endpoint endpoint, Processor processor) { 124 return getProducerCache().send(endpoint, processor); 125 } 126 127 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 128 return getProducerCache().send(endpoint, pattern, processor); 129 } 130 131 public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) { 132 Exchange result = send(endpoint, pattern, createSetBodyProcessor(body)); 133 return extractResultBody(result, pattern); 134 } 135 136 public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException { 137 Exchange result = send(endpoint, createSetBodyProcessor(body)); 138 // must invoke extract result body in case of exception to be rethrown 139 extractResultBody(result); 140 } 141 142 public void sendBody(String endpointUri, Object body) throws CamelExecutionException { 143 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 144 sendBody(endpoint, body); 145 } 146 147 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException { 148 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 149 Object result = sendBody(endpoint, pattern, body); 150 if (pattern.isOutCapable()) { 151 return result; 152 } else { 153 // return null if not OUT capable 154 return null; 155 } 156 } 157 158 public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) throws CamelExecutionException { 159 sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 160 } 161 162 public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) throws CamelExecutionException { 163 Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); 164 // must invoke extract result body in case of exception to be rethrown 165 extractResultBody(result); 166 } 167 168 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, 169 final String header, final Object headerValue) throws CamelExecutionException { 170 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 171 Object result = extractResultBody(exchange, pattern); 172 if (pattern.isOutCapable()) { 173 return result; 174 } else { 175 // return null if not OUT capable 176 return null; 177 } 178 } 179 180 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, 181 final String header, final Object headerValue) throws CamelExecutionException { 182 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 183 Object result = extractResultBody(exchange, pattern); 184 if (pattern.isOutCapable()) { 185 return result; 186 } else { 187 // return null if not OUT capable 188 return null; 189 } 190 } 191 192 public void sendBodyAndProperty(String endpointUri, final Object body, 193 final String property, final Object propertyValue) throws CamelExecutionException { 194 sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue); 195 } 196 197 public void sendBodyAndProperty(Endpoint endpoint, final Object body, 198 final String property, final Object propertyValue) throws CamelExecutionException { 199 Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue)); 200 // must invoke extract result body in case of exception to be rethrown 201 extractResultBody(result); 202 } 203 204 public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body, 205 final String property, final Object propertyValue) throws CamelExecutionException { 206 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 207 Object result = extractResultBody(exchange, pattern); 208 if (pattern.isOutCapable()) { 209 return result; 210 } else { 211 // return null if not OUT capable 212 return null; 213 } 214 } 215 216 public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body, 217 final String property, final Object propertyValue) throws CamelExecutionException { 218 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 219 Object result = extractResultBody(exchange, pattern); 220 if (pattern.isOutCapable()) { 221 return result; 222 } else { 223 // return null if not OUT capable 224 return null; 225 } 226 } 227 228 public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 229 sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 230 } 231 232 public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 233 Exchange result = send(endpoint, new Processor() { 234 public void process(Exchange exchange) { 235 Message in = exchange.getIn(); 236 for (Map.Entry<String, Object> header : headers.entrySet()) { 237 in.setHeader(header.getKey(), header.getValue()); 238 } 239 in.setBody(body); 240 } 241 }); 242 // must invoke extract result body in case of exception to be rethrown 243 extractResultBody(result); 244 } 245 246 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) throws CamelExecutionException { 247 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers); 248 } 249 250 public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 251 Exchange exchange = send(endpoint, pattern, new Processor() { 252 public void process(Exchange exchange) throws Exception { 253 Message in = exchange.getIn(); 254 for (Map.Entry<String, Object> header : headers.entrySet()) { 255 in.setHeader(header.getKey(), header.getValue()); 256 } 257 in.setBody(body); 258 } 259 }); 260 Object result = extractResultBody(exchange, pattern); 261 if (pattern.isOutCapable()) { 262 return result; 263 } else { 264 // return null if not OUT capable 265 return null; 266 } 267 } 268 269 // Methods using an InOut ExchangePattern 270 // ----------------------------------------------------------------------- 271 272 public Exchange request(Endpoint endpoint, Processor processor) { 273 return send(endpoint, ExchangePattern.InOut, processor); 274 } 275 276 public Object requestBody(Object body) throws CamelExecutionException { 277 return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body); 278 } 279 280 public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException { 281 return sendBody(endpoint, ExchangePattern.InOut, body); 282 } 283 284 public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException { 285 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue); 286 } 287 288 public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) throws CamelExecutionException { 289 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 290 } 291 292 public Exchange request(String endpoint, Processor processor) throws CamelExecutionException { 293 return send(endpoint, ExchangePattern.InOut, processor); 294 } 295 296 public Object requestBody(String endpoint, Object body) throws CamelExecutionException { 297 return sendBody(endpoint, ExchangePattern.InOut, body); 298 } 299 300 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) throws CamelExecutionException { 301 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 302 } 303 304 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) { 305 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 306 } 307 308 public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 309 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers); 310 } 311 312 public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) { 313 return sendBodyAndHeaders(getDefaultEndpoint(), ExchangePattern.InOut, body, headers); 314 } 315 316 public <T> T requestBody(Object body, Class<T> type) { 317 Object answer = requestBody(body); 318 return camelContext.getTypeConverter().convertTo(type, answer); 319 } 320 321 public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) { 322 Object answer = requestBody(endpoint, body); 323 return camelContext.getTypeConverter().convertTo(type, answer); 324 } 325 326 public <T> T requestBody(String endpointUri, Object body, Class<T> type) { 327 Object answer = requestBody(endpointUri, body); 328 return camelContext.getTypeConverter().convertTo(type, answer); 329 } 330 331 public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) { 332 Object answer = requestBodyAndHeader(endpoint, body, header, headerValue); 333 return camelContext.getTypeConverter().convertTo(type, answer); 334 } 335 336 public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) { 337 Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue); 338 return camelContext.getTypeConverter().convertTo(type, answer); 339 } 340 341 public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) { 342 Object answer = requestBodyAndHeaders(endpointUri, body, headers); 343 return camelContext.getTypeConverter().convertTo(type, answer); 344 } 345 346 public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) { 347 Object answer = requestBodyAndHeaders(endpoint, body, headers); 348 return camelContext.getTypeConverter().convertTo(type, answer); 349 } 350 351 // Methods using the default endpoint 352 // ----------------------------------------------------------------------- 353 354 public void sendBody(Object body) { 355 sendBody(getMandatoryDefaultEndpoint(), body); 356 } 357 358 public Exchange send(Exchange exchange) { 359 return send(getMandatoryDefaultEndpoint(), exchange); 360 } 361 362 public Exchange send(Processor processor) { 363 return send(getMandatoryDefaultEndpoint(), processor); 364 } 365 366 public void sendBodyAndHeader(Object body, String header, Object headerValue) { 367 sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 368 } 369 370 public void sendBodyAndProperty(Object body, String property, Object propertyValue) { 371 sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue); 372 } 373 374 public void sendBodyAndHeaders(Object body, Map<String, Object> headers) { 375 sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 376 } 377 378 // Properties 379 // ----------------------------------------------------------------------- 380 381 /** 382 * @deprecated use {@link #getCamelContext()} 383 */ 384 @Deprecated 385 public CamelContext getContext() { 386 return getCamelContext(); 387 } 388 389 public CamelContext getCamelContext() { 390 return camelContext; 391 } 392 393 public Endpoint getDefaultEndpoint() { 394 return defaultEndpoint; 395 } 396 397 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 398 this.defaultEndpoint = defaultEndpoint; 399 } 400 401 /** 402 * Sets the default endpoint to use if none is specified 403 */ 404 public void setDefaultEndpointUri(String endpointUri) { 405 setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); 406 } 407 408 /** 409 * @deprecated use {@link CamelContext#getEndpoint(String, Class)} 410 */ 411 @Deprecated 412 public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) { 413 return camelContext.getEndpoint(endpointUri, expectedClass); 414 } 415 416 // Implementation methods 417 // ----------------------------------------------------------------------- 418 419 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) { 420 return new Processor() { 421 public void process(Exchange exchange) { 422 Message in = exchange.getIn(); 423 in.setHeader(header, headerValue); 424 in.setBody(body); 425 } 426 }; 427 } 428 429 protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) { 430 return new Processor() { 431 public void process(Exchange exchange) { 432 exchange.setProperty(property, propertyValue); 433 Message in = exchange.getIn(); 434 in.setBody(body); 435 } 436 }; 437 } 438 439 protected Processor createSetBodyProcessor(final Object body) { 440 return new Processor() { 441 public void process(Exchange exchange) { 442 Message in = exchange.getIn(); 443 in.setBody(body); 444 } 445 }; 446 } 447 448 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 449 Endpoint endpoint = camelContext.getEndpoint(endpointUri); 450 if (endpoint == null) { 451 throw new NoSuchEndpointException(endpointUri); 452 } 453 return endpoint; 454 } 455 456 protected Endpoint getMandatoryDefaultEndpoint() { 457 Endpoint answer = getDefaultEndpoint(); 458 ObjectHelper.notNull(answer, "defaultEndpoint"); 459 return answer; 460 } 461 462 protected Object extractResultBody(Exchange result) { 463 return extractResultBody(result, null); 464 } 465 466 protected Object extractResultBody(Exchange result, ExchangePattern pattern) { 467 return ExchangeHelper.extractResultBody(result, pattern); 468 } 469 470 public void setExecutorService(ExecutorService executorService) { 471 this.executor = executorService; 472 } 473 474 public Future<Exchange> asyncSend(final String uri, final Exchange exchange) { 475 return asyncSend(resolveMandatoryEndpoint(uri), exchange); 476 } 477 478 public Future<Exchange> asyncSend(final String uri, final Processor processor) { 479 return asyncSend(resolveMandatoryEndpoint(uri), processor); 480 } 481 482 public Future<Object> asyncSendBody(final String uri, final Object body) { 483 return asyncSendBody(resolveMandatoryEndpoint(uri), body); 484 } 485 486 public Future<Object> asyncRequestBody(final String uri, final Object body) { 487 return asyncRequestBody(resolveMandatoryEndpoint(uri), body); 488 } 489 490 public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { 491 return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type); 492 } 493 494 public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { 495 return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 496 } 497 498 public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { 499 return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type); 500 } 501 502 public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { 503 return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 504 } 505 506 public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { 507 return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type); 508 } 509 510 public <T> T extractFutureBody(Future<Object> future, Class<T> type) { 511 return ExchangeHelper.extractFutureBody(camelContext, future, type); 512 } 513 514 public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 515 return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type); 516 } 517 518 public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { 519 return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion); 520 } 521 522 public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { 523 return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion); 524 } 525 526 public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { 527 return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion); 528 } 529 530 public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { 531 return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion); 532 } 533 534 public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { 535 return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion); 536 } 537 538 public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) { 539 return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion); 540 } 541 542 public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) { 543 Callable<Object> task = new Callable<Object>() { 544 public Object call() throws Exception { 545 return requestBody(endpoint, body); 546 } 547 }; 548 return getExecutorService().submit(task); 549 } 550 551 public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) { 552 Callable<T> task = new Callable<T>() { 553 public T call() throws Exception { 554 return requestBody(endpoint, body, type); 555 } 556 }; 557 return getExecutorService().submit(task); 558 } 559 560 public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, 561 final Object headerValue) { 562 Callable<Object> task = new Callable<Object>() { 563 public Object call() throws Exception { 564 return requestBodyAndHeader(endpoint, body, header, headerValue); 565 } 566 }; 567 return getExecutorService().submit(task); 568 } 569 570 public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, 571 final Object headerValue, final Class<T> type) { 572 Callable<T> task = new Callable<T>() { 573 public T call() throws Exception { 574 return requestBodyAndHeader(endpoint, body, header, headerValue, type); 575 } 576 }; 577 return getExecutorService().submit(task); 578 } 579 580 public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, 581 final Map<String, Object> headers) { 582 Callable<Object> task = new Callable<Object>() { 583 public Object call() throws Exception { 584 return requestBodyAndHeaders(endpoint, body, headers); 585 } 586 }; 587 return getExecutorService().submit(task); 588 } 589 590 public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, 591 final Map<String, Object> headers, final Class<T> type) { 592 Callable<T> task = new Callable<T>() { 593 public T call() throws Exception { 594 return requestBodyAndHeaders(endpoint, body, headers, type); 595 } 596 }; 597 return getExecutorService().submit(task); 598 } 599 600 public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) { 601 Callable<Exchange> task = new Callable<Exchange>() { 602 public Exchange call() throws Exception { 603 return send(endpoint, exchange); 604 } 605 }; 606 return getExecutorService().submit(task); 607 } 608 609 public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) { 610 Callable<Exchange> task = new Callable<Exchange>() { 611 public Exchange call() throws Exception { 612 return send(endpoint, processor); 613 } 614 }; 615 return getExecutorService().submit(task); 616 } 617 618 public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) { 619 Callable<Object> task = new Callable<Object>() { 620 public Object call() throws Exception { 621 sendBody(endpoint, body); 622 // its InOnly, so no body to return 623 return null; 624 } 625 }; 626 return getExecutorService().submit(task); 627 } 628 629 private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) { 630 Callable<Object> task = new Callable<Object>() { 631 public Object call() throws Exception { 632 Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body)); 633 634 // invoke callback before returning answer 635 // as it allows callback to be used without unit of work invoking it 636 // and thus it works directly from a producer template as well, as opposed 637 // to the unit of work that is injected in routes 638 if (answer.isFailed()) { 639 onCompletion.onFailure(answer); 640 } else { 641 onCompletion.onComplete(answer); 642 } 643 644 Object result = extractResultBody(answer, pattern); 645 if (pattern.isOutCapable()) { 646 return result; 647 } else { 648 // return null if not OUT capable 649 return null; 650 } 651 } 652 }; 653 return getExecutorService().submit(task); 654 } 655 656 public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { 657 Callable<Exchange> task = new Callable<Exchange>() { 658 public Exchange call() throws Exception { 659 // process the exchange, any exception occurring will be caught and set on the exchange 660 send(endpoint, exchange); 661 662 // invoke callback before returning answer 663 // as it allows callback to be used without unit of work invoking it 664 // and thus it works directly from a producer template as well, as opposed 665 // to the unit of work that is injected in routes 666 if (exchange.isFailed()) { 667 onCompletion.onFailure(exchange); 668 } else { 669 onCompletion.onComplete(exchange); 670 } 671 return exchange; 672 } 673 }; 674 return getExecutorService().submit(task); 675 } 676 677 public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { 678 Callable<Exchange> task = new Callable<Exchange>() { 679 public Exchange call() throws Exception { 680 // process the exchange, any exception occurring will be caught and set on the exchange 681 Exchange answer = send(endpoint, processor); 682 683 // invoke callback before returning answer 684 // as it allows callback to be used without unit of work invoking it 685 // and thus it works directly from a producer template as well, as opposed 686 // to the unit of work that is injected in routes 687 if (answer.isFailed()) { 688 onCompletion.onFailure(answer); 689 } else { 690 onCompletion.onComplete(answer); 691 } 692 return answer; 693 } 694 }; 695 return getExecutorService().submit(task); 696 } 697 698 private ProducerCache getProducerCache() { 699 if (!isStarted()) { 700 throw new IllegalStateException("ProducerTemplate has not been started"); 701 } 702 return producerCache; 703 } 704 705 private ExecutorService getExecutorService() { 706 if (!isStarted()) { 707 throw new IllegalStateException("ProducerTemplate has not been started"); 708 } 709 710 if (executor != null) { 711 return executor; 712 } 713 714 // create a default executor which must be synchronized 715 synchronized (this) { 716 if (executor != null) { 717 return executor; 718 } 719 executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); 720 } 721 722 ObjectHelper.notNull(executor, "ExecutorService"); 723 return executor; 724 } 725 726 protected void doStart() throws Exception { 727 if (producerCache == null) { 728 if (maximumCacheSize > 0) { 729 producerCache = new ProducerCache(this, camelContext, maximumCacheSize); 730 } else { 731 producerCache = new ProducerCache(this, camelContext); 732 } 733 producerCache.setEventNotifierEnabled(isEventNotifierEnabled()); 734 } 735 ServiceHelper.startService(producerCache); 736 } 737 738 protected void doStop() throws Exception { 739 ServiceHelper.stopService(producerCache); 740 producerCache = null; 741 742 if (executor != null) { 743 camelContext.getExecutorServiceManager().shutdownNow(executor); 744 executor = null; 745 } 746 } 747 748 }