001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.ConcurrentHashMap; 024 import java.util.concurrent.ExecutionException; 025 import java.util.concurrent.Future; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.TimeoutException; 028 029 import org.apache.camel.CamelContext; 030 import org.apache.camel.CamelExchangeException; 031 import org.apache.camel.CamelExecutionException; 032 import org.apache.camel.Endpoint; 033 import org.apache.camel.Exchange; 034 import org.apache.camel.ExchangePattern; 035 import org.apache.camel.InvalidPayloadException; 036 import org.apache.camel.Message; 037 import org.apache.camel.MessageHistory; 038 import org.apache.camel.NoSuchBeanException; 039 import org.apache.camel.NoSuchEndpointException; 040 import org.apache.camel.NoSuchHeaderException; 041 import org.apache.camel.NoSuchPropertyException; 042 import org.apache.camel.NoTypeConversionAvailableException; 043 import org.apache.camel.TypeConversionException; 044 import org.apache.camel.TypeConverter; 045 import org.apache.camel.impl.DefaultExchange; 046 import org.apache.camel.spi.UnitOfWork; 047 048 /** 049 * Some helper methods for working with {@link Exchange} objects 050 * 051 * @version 052 */ 053 public final class ExchangeHelper { 054 055 /** 056 * Utility classes should not have a public constructor. 057 */ 058 private ExchangeHelper() { 059 } 060 061 /** 062 * Extracts the Exchange.BINDING of the given type or null if not present 063 * 064 * @param exchange the message exchange 065 * @param type the expected binding type 066 * @return the binding object of the given type or null if it could not be found or converted 067 */ 068 public static <T> T getBinding(Exchange exchange, Class<T> type) { 069 return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null; 070 } 071 072 /** 073 * Attempts to resolve the endpoint for the given value 074 * 075 * @param exchange the message exchange being processed 076 * @param value the value which can be an {@link Endpoint} or an object 077 * which provides a String representation of an endpoint via 078 * {@link #toString()} 079 * @return the endpoint 080 * @throws NoSuchEndpointException if the endpoint cannot be resolved 081 */ 082 public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { 083 Endpoint endpoint; 084 if (value instanceof Endpoint) { 085 endpoint = (Endpoint) value; 086 } else { 087 String uri = value.toString().trim(); 088 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 089 } 090 return endpoint; 091 } 092 093 /** 094 * Gets the mandatory property of the exchange of the correct type 095 * 096 * @param exchange the exchange 097 * @param propertyName the property name 098 * @param type the type 099 * @return the property value 100 * @throws TypeConversionException is thrown if error during type conversion 101 * @throws NoSuchPropertyException is thrown if no property exists 102 */ 103 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { 104 T result = exchange.getProperty(propertyName, type); 105 if (result != null) { 106 return result; 107 } 108 throw new NoSuchPropertyException(exchange, propertyName, type); 109 } 110 111 /** 112 * Gets the mandatory inbound header of the correct type 113 * 114 * @param exchange the exchange 115 * @param headerName the header name 116 * @param type the type 117 * @return the header value 118 * @throws TypeConversionException is thrown if error during type conversion 119 * @throws NoSuchHeaderException is thrown if no headers exists 120 */ 121 public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 122 T answer = exchange.getIn().getHeader(headerName, type); 123 if (answer == null) { 124 throw new NoSuchHeaderException(exchange, headerName, type); 125 } 126 return answer; 127 } 128 129 /** 130 * Returns the mandatory inbound message body of the correct type or throws 131 * an exception if it is not present 132 * 133 * @param exchange the exchange 134 * @return the body, is never <tt>null</tt> 135 * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type 136 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 137 */ 138 @Deprecated 139 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 140 return exchange.getIn().getMandatoryBody(); 141 } 142 143 /** 144 * Returns the mandatory inbound message body of the correct type or throws 145 * an exception if it is not present 146 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 147 */ 148 @Deprecated 149 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 150 return exchange.getIn().getMandatoryBody(type); 151 } 152 153 /** 154 * Returns the mandatory outbound message body of the correct type or throws 155 * an exception if it is not present 156 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 157 */ 158 @Deprecated 159 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 160 return exchange.getOut().getMandatoryBody(); 161 } 162 163 /** 164 * Returns the mandatory outbound message body of the correct type or throws 165 * an exception if it is not present 166 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 167 */ 168 @Deprecated 169 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 170 return exchange.getOut().getMandatoryBody(type); 171 } 172 173 /** 174 * Converts the value to the given expected type or throws an exception 175 * 176 * @return the converted value 177 * @throws TypeConversionException is thrown if error during type conversion 178 * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type 179 */ 180 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) 181 throws TypeConversionException, NoTypeConversionAvailableException { 182 CamelContext camelContext = exchange.getContext(); 183 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 184 TypeConverter converter = camelContext.getTypeConverter(); 185 if (converter != null) { 186 return converter.mandatoryConvertTo(type, exchange, value); 187 } 188 throw new NoTypeConversionAvailableException(value, type); 189 } 190 191 /** 192 * Converts the value to the given expected type 193 * 194 * @return the converted value 195 * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion 196 */ 197 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException { 198 CamelContext camelContext = exchange.getContext(); 199 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 200 TypeConverter converter = camelContext.getTypeConverter(); 201 if (converter != null) { 202 return converter.convertTo(type, exchange, value); 203 } 204 return null; 205 } 206 207 /** 208 * Creates a new instance and copies from the current message exchange so that it can be 209 * forwarded to another destination as a new instance. Unlike regular copy this operation 210 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 211 * for async messaging, where the original and copied exchange are independent. 212 * 213 * @param exchange original copy of the exchange 214 * @param handover whether the on completion callbacks should be handed over to the new copy. 215 */ 216 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { 217 String id = exchange.getExchangeId(); 218 219 Exchange copy = exchange.copy(); 220 // do not share the unit of work 221 copy.setUnitOfWork(null); 222 // hand over on completion to the copy if we got any 223 UnitOfWork uow = exchange.getUnitOfWork(); 224 if (handover && uow != null) { 225 uow.handoverSynchronization(copy); 226 } 227 // set a correlation id so we can track back the original exchange 228 copy.setProperty(Exchange.CORRELATION_ID, id); 229 return copy; 230 } 231 232 /** 233 * Creates a new instance and copies from the current message exchange so that it can be 234 * forwarded to another destination as a new instance. 235 * 236 * @param exchange original copy of the exchange 237 * @param preserveExchangeId whether or not the exchange id should be preserved 238 * @return the copy 239 */ 240 public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { 241 Exchange copy = exchange.copy(); 242 if (preserveExchangeId) { 243 // must preserve exchange id 244 copy.setExchangeId(exchange.getExchangeId()); 245 } 246 return copy; 247 } 248 249 /** 250 * Copies the results of a message exchange from the source exchange to the result exchange 251 * which will copy the message contents, exchange properties and the exception. 252 * Notice the {@link ExchangePattern} is <b>not</b> copied/altered. 253 * 254 * @param result the result exchange which will have the output and error state added 255 * @param source the source exchange which is not modified 256 */ 257 public static void copyResults(Exchange result, Exchange source) { 258 259 // -------------------------------------------------------------------- 260 // TODO: merge logic with that of copyResultsPreservePattern() 261 // -------------------------------------------------------------------- 262 263 if (result == source) { 264 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 265 // and the result is not failed 266 if (result.getPattern() == ExchangePattern.InOptionalOut) { 267 // keep as is 268 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 269 // copy IN to OUT as we expect a OUT response 270 result.getOut().copyFrom(source.getIn()); 271 } 272 return; 273 } 274 275 if (result != source) { 276 result.setException(source.getException()); 277 if (source.hasOut()) { 278 result.getOut().copyFrom(source.getOut()); 279 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 280 // special case where the result is InOptionalOut and with no OUT response 281 // so we should return null to indicate this fact 282 result.setOut(null); 283 } else { 284 // no results so lets copy the last input 285 // as the final processor on a pipeline might not 286 // have created any OUT; such as a mock:endpoint 287 // so lets assume the last IN is the OUT 288 if (result.getPattern().isOutCapable()) { 289 // only set OUT if its OUT capable 290 result.getOut().copyFrom(source.getIn()); 291 } else { 292 // if not replace IN instead to keep the MEP 293 result.getIn().copyFrom(source.getIn()); 294 // clear any existing OUT as the result is on the IN 295 if (result.hasOut()) { 296 result.setOut(null); 297 } 298 } 299 } 300 301 if (source.hasProperties()) { 302 result.getProperties().putAll(source.getProperties()); 303 } 304 } 305 } 306 307 /** 308 * Copies the <code>source</code> exchange to <code>target</code> exchange 309 * preserving the {@link ExchangePattern} of <code>target</code>. 310 * 311 * @param source source exchange. 312 * @param result target exchange. 313 */ 314 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 315 316 // -------------------------------------------------------------------- 317 // TODO: merge logic with that of copyResults() 318 // -------------------------------------------------------------------- 319 320 if (result == source) { 321 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 322 // and the result is not failed 323 if (result.getPattern() == ExchangePattern.InOptionalOut) { 324 // keep as is 325 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 326 // copy IN to OUT as we expect a OUT response 327 result.getOut().copyFrom(source.getIn()); 328 } 329 return; 330 } 331 332 // copy in message 333 result.getIn().copyFrom(source.getIn()); 334 335 // copy out message 336 if (source.hasOut()) { 337 // exchange pattern sensitive 338 Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result); 339 resultMessage.copyFrom(source.getOut()); 340 } 341 342 // copy exception 343 result.setException(source.getException()); 344 345 // copy properties 346 if (source.hasProperties()) { 347 result.getProperties().putAll(source.getProperties()); 348 } 349 } 350 351 /** 352 * Returns the message where to write results in an 353 * exchange-pattern-sensitive way. 354 * 355 * @param exchange message exchange. 356 * @return result message. 357 */ 358 public static Message getResultMessage(Exchange exchange) { 359 if (exchange.getPattern().isOutCapable()) { 360 return exchange.getOut(); 361 } else { 362 return exchange.getIn(); 363 } 364 } 365 366 /** 367 * Returns true if the given exchange pattern (if defined) can support OUT messages 368 * 369 * @param exchange the exchange to interrogate 370 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 371 * OUT messages 372 */ 373 public static boolean isOutCapable(Exchange exchange) { 374 ExchangePattern pattern = exchange.getPattern(); 375 return pattern != null && pattern.isOutCapable(); 376 } 377 378 /** 379 * Creates a new instance of the given type from the injector 380 * 381 * @param exchange the exchange 382 * @param type the given type 383 * @return the created instance of the given type 384 */ 385 public static <T> T newInstance(Exchange exchange, Class<T> type) { 386 return exchange.getContext().getInjector().newInstance(type); 387 } 388 389 /** 390 * Creates a Map of the variables which are made available to a script or template 391 * 392 * @param exchange the exchange to make available 393 * @return a Map populated with the require variables 394 */ 395 public static Map<String, Object> createVariableMap(Exchange exchange) { 396 Map<String, Object> answer = new HashMap<String, Object>(); 397 populateVariableMap(exchange, answer); 398 return answer; 399 } 400 401 /** 402 * Populates the Map with the variables which are made available to a script or template 403 * 404 * @param exchange the exchange to make available 405 * @param map the map to populate 406 */ 407 public static void populateVariableMap(Exchange exchange, Map<String, Object> map) { 408 map.put("exchange", exchange); 409 Message in = exchange.getIn(); 410 map.put("in", in); 411 map.put("request", in); 412 map.put("headers", in.getHeaders()); 413 map.put("body", in.getBody()); 414 if (isOutCapable(exchange)) { 415 // if we are out capable then set out and response as well 416 // however only grab OUT if it exists, otherwise reuse IN 417 // this prevents side effects to alter the Exchange if we force creating an OUT message 418 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 419 map.put("out", msg); 420 map.put("response", msg); 421 } 422 map.put("camelContext", exchange.getContext()); 423 } 424 425 /** 426 * Returns the MIME content type on the input message or null if one is not defined 427 * 428 * @param exchange the exchange 429 * @return the MIME content type 430 */ 431 public static String getContentType(Exchange exchange) { 432 return MessageHelper.getContentType(exchange.getIn()); 433 } 434 435 /** 436 * Returns the MIME content encoding on the input message or null if one is not defined 437 * 438 * @param exchange the exchange 439 * @return the MIME content encoding 440 */ 441 public static String getContentEncoding(Exchange exchange) { 442 return MessageHelper.getContentEncoding(exchange.getIn()); 443 } 444 445 /** 446 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 447 * 448 * @param exchange the exchange 449 * @param name the bean name 450 * @return the bean 451 * @throws NoSuchBeanException if no bean could be found in the registry 452 */ 453 public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException { 454 Object value = lookupBean(exchange, name); 455 if (value == null) { 456 throw new NoSuchBeanException(name); 457 } 458 return value; 459 } 460 461 /** 462 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 463 * 464 * @param exchange the exchange 465 * @param name the bean name 466 * @param type the expected bean type 467 * @return the bean 468 * @throws NoSuchBeanException if no bean could be found in the registry 469 */ 470 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 471 T value = lookupBean(exchange, name, type); 472 if (value == null) { 473 throw new NoSuchBeanException(name); 474 } 475 return value; 476 } 477 478 /** 479 * Performs a lookup in the registry of the bean name 480 * 481 * @param exchange the exchange 482 * @param name the bean name 483 * @return the bean, or <tt>null</tt> if no bean could be found 484 */ 485 public static Object lookupBean(Exchange exchange, String name) { 486 return exchange.getContext().getRegistry().lookupByName(name); 487 } 488 489 /** 490 * Performs a lookup in the registry of the bean name and type 491 * 492 * @param exchange the exchange 493 * @param name the bean name 494 * @param type the expected bean type 495 * @return the bean, or <tt>null</tt> if no bean could be found 496 */ 497 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 498 return exchange.getContext().getRegistry().lookupByNameAndType(name, type); 499 } 500 501 /** 502 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 503 * or null if none could be found 504 * 505 * @param exchanges the exchanges 506 * @param exchangeId the exchangeId to find 507 * @return matching exchange, or <tt>null</tt> if none found 508 */ 509 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 510 for (Exchange exchange : exchanges) { 511 String id = exchange.getExchangeId(); 512 if (id != null && id.equals(exchangeId)) { 513 return exchange; 514 } 515 } 516 return null; 517 } 518 519 /** 520 * Prepares the exchanges for aggregation. 521 * <p/> 522 * This implementation will copy the OUT body to the IN body so when you do 523 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users. 524 * 525 * @param oldExchange the old exchange 526 * @param newExchange the new exchange 527 */ 528 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) { 529 // move body/header from OUT to IN 530 if (oldExchange != null) { 531 if (oldExchange.hasOut()) { 532 oldExchange.setIn(oldExchange.getOut()); 533 oldExchange.setOut(null); 534 } 535 } 536 537 if (newExchange != null) { 538 if (newExchange.hasOut()) { 539 newExchange.setIn(newExchange.getOut()); 540 newExchange.setOut(null); 541 } 542 } 543 } 544 545 /** 546 * Checks whether the exchange has been failure handed 547 * 548 * @param exchange the exchange 549 * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise 550 */ 551 public static boolean isFailureHandled(Exchange exchange) { 552 return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class); 553 } 554 555 /** 556 * Checks whether the exchange {@link UnitOfWork} is exhausted 557 * 558 * @param exchange the exchange 559 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 560 */ 561 public static boolean isUnitOfWorkExhausted(Exchange exchange) { 562 return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class); 563 } 564 565 /** 566 * Sets the exchange to be failure handled. 567 * 568 * @param exchange the exchange 569 */ 570 public static void setFailureHandled(Exchange exchange) { 571 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 572 // clear exception since its failure handled 573 exchange.setException(null); 574 } 575 576 /** 577 * Checks whether the exchange is redelivery exhausted 578 * 579 * @param exchange the exchange 580 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 581 */ 582 public static boolean isRedeliveryExhausted(Exchange exchange) { 583 return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 584 } 585 586 /** 587 * Checks whether the exchange {@link UnitOfWork} is redelivered 588 * 589 * @param exchange the exchange 590 * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise 591 */ 592 public static boolean isRedelivered(Exchange exchange) { 593 return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class); 594 } 595 596 /** 597 * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing 598 * 599 * @param exchange the exchange 600 * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise 601 */ 602 public static boolean isInterrupted(Exchange exchange) { 603 return exchange.getException(InterruptedException.class) != null; 604 } 605 606 /** 607 * Extracts the body from the given exchange. 608 * <p/> 609 * If the exchange pattern is provided it will try to honor it and retrieve the body 610 * from either IN or OUT according to the pattern. 611 * 612 * @param exchange the exchange 613 * @param pattern exchange pattern if given, can be <tt>null</tt> 614 * @return the result body, can be <tt>null</tt>. 615 * @throws CamelExecutionException is thrown if the processing of the exchange failed 616 */ 617 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 618 Object answer = null; 619 if (exchange != null) { 620 // rethrow if there was an exception during execution 621 if (exchange.getException() != null) { 622 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 623 } 624 625 // result could have a fault message 626 if (hasFaultMessage(exchange)) { 627 return exchange.getOut().getBody(); 628 } 629 630 // okay no fault then return the response according to the pattern 631 // try to honor pattern if provided 632 boolean notOut = pattern != null && !pattern.isOutCapable(); 633 boolean hasOut = exchange.hasOut(); 634 if (hasOut && !notOut) { 635 // we have a response in out and the pattern is out capable 636 answer = exchange.getOut().getBody(); 637 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 638 // special case where the result is InOptionalOut and with no OUT response 639 // so we should return null to indicate this fact 640 answer = null; 641 } else { 642 // use IN as the response 643 answer = exchange.getIn().getBody(); 644 } 645 } 646 return answer; 647 } 648 649 /** 650 * Tests whether the exchange has a fault message set and that its not null. 651 * 652 * @param exchange the exchange 653 * @return <tt>true</tt> if fault message exists 654 */ 655 public static boolean hasFaultMessage(Exchange exchange) { 656 return exchange.hasOut() && exchange.getOut().isFault() && exchange.getOut().getBody() != null; 657 } 658 659 /** 660 * Tests whether the exchange has already been handled by the error handler 661 * 662 * @param exchange the exchange 663 * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise 664 */ 665 public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { 666 return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); 667 } 668 669 /** 670 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 671 * <p/> 672 * Will wait until the future task is complete. 673 * 674 * @param context the camel context 675 * @param future the future handle 676 * @param type the expected body response type 677 * @return the result body, can be <tt>null</tt>. 678 * @throws CamelExecutionException is thrown if the processing of the exchange failed 679 */ 680 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) { 681 try { 682 return doExtractFutureBody(context, future.get(), type); 683 } catch (InterruptedException e) { 684 throw ObjectHelper.wrapRuntimeCamelException(e); 685 } catch (ExecutionException e) { 686 // execution failed due to an exception so rethrow the cause 687 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 688 } finally { 689 // its harmless to cancel if task is already completed 690 // and in any case we do not want to get hold of the task a 2nd time 691 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 692 future.cancel(true); 693 } 694 } 695 696 /** 697 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 698 * <p/> 699 * Will wait for the future task to complete, but waiting at most the timeout value. 700 * 701 * @param context the camel context 702 * @param future the future handle 703 * @param timeout timeout value 704 * @param unit timeout unit 705 * @param type the expected body response type 706 * @return the result body, can be <tt>null</tt>. 707 * @throws CamelExecutionException is thrown if the processing of the exchange failed 708 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 709 */ 710 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 711 try { 712 if (timeout > 0) { 713 return doExtractFutureBody(context, future.get(timeout, unit), type); 714 } else { 715 return doExtractFutureBody(context, future.get(), type); 716 } 717 } catch (InterruptedException e) { 718 // execution failed due interruption so rethrow the cause 719 throw ObjectHelper.wrapCamelExecutionException(null, e); 720 } catch (ExecutionException e) { 721 // execution failed due to an exception so rethrow the cause 722 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 723 } finally { 724 // its harmless to cancel if task is already completed 725 // and in any case we do not want to get hold of the task a 2nd time 726 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 727 future.cancel(true); 728 } 729 } 730 731 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 732 if (result == null) { 733 return null; 734 } 735 if (type.isAssignableFrom(result.getClass())) { 736 return type.cast(result); 737 } 738 if (result instanceof Exchange) { 739 Exchange exchange = (Exchange) result; 740 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 741 return context.getTypeConverter().convertTo(type, exchange, answer); 742 } 743 return context.getTypeConverter().convertTo(type, result); 744 } 745 746 /** 747 * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead 748 */ 749 @Deprecated 750 public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) { 751 return CamelExchangeException.createExceptionMessage(message, exchange, cause); 752 } 753 754 /** 755 * Strategy to prepare results before next iterator or when we are complete, 756 * which is done by copying OUT to IN, so there is only an IN as input 757 * for the next iteration. 758 * 759 * @param exchange the exchange to prepare 760 */ 761 public static void prepareOutToIn(Exchange exchange) { 762 // we are routing using pipes and filters so we need to manually copy OUT to IN 763 if (exchange.hasOut()) { 764 exchange.getIn().copyFrom(exchange.getOut()); 765 exchange.setOut(null); 766 } 767 } 768 769 /** 770 * Gets both the messageId and exchangeId to be used for logging purposes. 771 * <p/> 772 * Logging both ids, can help to correlate exchanges which may be redelivered messages 773 * from for example a JMS broker. 774 * 775 * @param exchange the exchange 776 * @return a log message with both the messageId and exchangeId 777 */ 778 public static String logIds(Exchange exchange) { 779 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 780 return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; 781 } 782 783 /** 784 * Copies the exchange but the copy will be tied to the given context 785 * 786 * @param exchange the source exchange 787 * @param context the camel context 788 * @return a copy with the given camel context 789 */ 790 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { 791 return copyExchangeAndSetCamelContext(exchange, context, true); 792 } 793 794 /** 795 * Copies the exchange but the copy will be tied to the given context 796 * 797 * @param exchange the source exchange 798 * @param context the camel context 799 * @param handover whether to handover on completions from the source to the copy 800 * @return a copy with the given camel context 801 */ 802 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { 803 DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); 804 if (exchange.hasProperties()) { 805 answer.setProperties(safeCopy(exchange.getProperties())); 806 } 807 if (handover) { 808 // Need to hand over the completion for async invocation 809 exchange.handoverCompletions(answer); 810 } 811 answer.setIn(exchange.getIn().copy()); 812 if (exchange.hasOut()) { 813 answer.setOut(exchange.getOut().copy()); 814 } 815 answer.setException(exchange.getException()); 816 return answer; 817 } 818 819 @SuppressWarnings("unchecked") 820 private static Map<String, Object> safeCopy(Map<String, Object> properties) { 821 if (properties == null) { 822 return null; 823 } 824 825 Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties); 826 827 // safe copy message history using a defensive copy 828 List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); 829 if (history != null) { 830 answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history)); 831 } 832 833 return answer; 834 } 835 }