001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.ConcurrentHashMap;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.TimeoutException;
028    
029    import org.apache.camel.CamelContext;
030    import org.apache.camel.CamelExchangeException;
031    import org.apache.camel.CamelExecutionException;
032    import org.apache.camel.Endpoint;
033    import org.apache.camel.Exchange;
034    import org.apache.camel.ExchangePattern;
035    import org.apache.camel.InvalidPayloadException;
036    import org.apache.camel.Message;
037    import org.apache.camel.MessageHistory;
038    import org.apache.camel.NoSuchBeanException;
039    import org.apache.camel.NoSuchEndpointException;
040    import org.apache.camel.NoSuchHeaderException;
041    import org.apache.camel.NoSuchPropertyException;
042    import org.apache.camel.NoTypeConversionAvailableException;
043    import org.apache.camel.TypeConversionException;
044    import org.apache.camel.TypeConverter;
045    import org.apache.camel.impl.DefaultExchange;
046    import org.apache.camel.spi.UnitOfWork;
047    
048    /**
049     * Some helper methods for working with {@link Exchange} objects
050     *
051     * @version 
052     */
053    public final class ExchangeHelper {
054    
055        /**
056         * Utility classes should not have a public constructor.
057         */
058        private ExchangeHelper() {
059        }
060    
061        /**
062         * Extracts the Exchange.BINDING of the given type or null if not present
063         *
064         * @param exchange the message exchange
065         * @param type     the expected binding type
066         * @return the binding object of the given type or null if it could not be found or converted
067         */
068        public static <T> T getBinding(Exchange exchange, Class<T> type) {
069            return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
070        }
071    
072        /**
073         * Attempts to resolve the endpoint for the given value
074         *
075         * @param exchange the message exchange being processed
076         * @param value    the value which can be an {@link Endpoint} or an object
077         *                 which provides a String representation of an endpoint via
078         *                 {@link #toString()}
079         * @return the endpoint
080         * @throws NoSuchEndpointException if the endpoint cannot be resolved
081         */
082        public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
083            Endpoint endpoint;
084            if (value instanceof Endpoint) {
085                endpoint = (Endpoint) value;
086            } else {
087                String uri = value.toString().trim();
088                endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
089            }
090            return endpoint;
091        }
092    
093        /**
094         * Gets the mandatory property of the exchange of the correct type
095         *
096         * @param exchange      the exchange
097         * @param propertyName  the property name
098         * @param type          the type
099         * @return the property value
100         * @throws TypeConversionException is thrown if error during type conversion
101         * @throws NoSuchPropertyException is thrown if no property exists
102         */
103        public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
104            T result = exchange.getProperty(propertyName, type);
105            if (result != null) {
106                return result;
107            }
108            throw new NoSuchPropertyException(exchange, propertyName, type);
109        }
110    
111        /**
112         * Gets the mandatory inbound header of the correct type
113         *
114         * @param exchange      the exchange
115         * @param headerName    the header name
116         * @param type          the type
117         * @return the header value
118         * @throws TypeConversionException is thrown if error during type conversion
119         * @throws NoSuchHeaderException is thrown if no headers exists
120         */
121        public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
122            T answer = exchange.getIn().getHeader(headerName, type);
123            if (answer == null) {
124                throw new NoSuchHeaderException(exchange, headerName, type);
125            }
126            return answer;
127        }
128    
129        /**
130         * Returns the mandatory inbound message body of the correct type or throws
131         * an exception if it is not present
132         *
133         * @param exchange the exchange
134         * @return the body, is never <tt>null</tt>
135         * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type
136         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
137         */
138        @Deprecated
139        public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
140            return exchange.getIn().getMandatoryBody();
141        }
142    
143        /**
144         * Returns the mandatory inbound message body of the correct type or throws
145         * an exception if it is not present
146         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
147         */
148        @Deprecated
149        public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
150            return exchange.getIn().getMandatoryBody(type);
151        }
152    
153        /**
154         * Returns the mandatory outbound message body of the correct type or throws
155         * an exception if it is not present
156         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
157         */
158        @Deprecated
159        public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
160            return exchange.getOut().getMandatoryBody();
161        }
162    
163        /**
164         * Returns the mandatory outbound message body of the correct type or throws
165         * an exception if it is not present
166         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
167         */
168        @Deprecated
169        public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
170            return exchange.getOut().getMandatoryBody(type);
171        }
172    
173        /**
174         * Converts the value to the given expected type or throws an exception
175         *
176         * @return the converted value
177         * @throws TypeConversionException is thrown if error during type conversion
178         * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
179         */
180        public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
181            throws TypeConversionException, NoTypeConversionAvailableException {
182            CamelContext camelContext = exchange.getContext();
183            ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
184            TypeConverter converter = camelContext.getTypeConverter();
185            if (converter != null) {
186                return converter.mandatoryConvertTo(type, exchange, value);
187            }
188            throw new NoTypeConversionAvailableException(value, type);
189        }
190    
191        /**
192         * Converts the value to the given expected type
193         *
194         * @return the converted value
195         * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
196         */
197        public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
198            CamelContext camelContext = exchange.getContext();
199            ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
200            TypeConverter converter = camelContext.getTypeConverter();
201            if (converter != null) {
202                return converter.convertTo(type, exchange, value);
203            }
204            return null;
205        }
206    
207        /**
208         * Creates a new instance and copies from the current message exchange so that it can be
209         * forwarded to another destination as a new instance. Unlike regular copy this operation
210         * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
211         * for async messaging, where the original and copied exchange are independent.
212         *
213         * @param exchange original copy of the exchange
214         * @param handover whether the on completion callbacks should be handed over to the new copy.
215         */
216        public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
217            String id = exchange.getExchangeId();
218    
219            Exchange copy = exchange.copy();
220            // do not share the unit of work
221            copy.setUnitOfWork(null);
222            // hand over on completion to the copy if we got any
223            UnitOfWork uow = exchange.getUnitOfWork();
224            if (handover && uow != null) {
225                uow.handoverSynchronization(copy);
226            }
227            // set a correlation id so we can track back the original exchange
228            copy.setProperty(Exchange.CORRELATION_ID, id);
229            return copy;
230        }
231    
232        /**
233         * Creates a new instance and copies from the current message exchange so that it can be
234         * forwarded to another destination as a new instance.
235         *
236         * @param exchange original copy of the exchange
237         * @param preserveExchangeId whether or not the exchange id should be preserved
238         * @return the copy
239         */
240        public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
241            Exchange copy = exchange.copy();
242            if (preserveExchangeId) {
243                // must preserve exchange id
244                copy.setExchangeId(exchange.getExchangeId());
245            }
246            return copy;
247        }
248    
249        /**
250         * Copies the results of a message exchange from the source exchange to the result exchange
251         * which will copy the message contents, exchange properties and the exception.
252         * Notice the {@link ExchangePattern} is <b>not</b> copied/altered.
253         *
254         * @param result the result exchange which will have the output and error state added
255         * @param source the source exchange which is not modified
256         */
257        public static void copyResults(Exchange result, Exchange source) {
258    
259            // --------------------------------------------------------------------
260            //  TODO: merge logic with that of copyResultsPreservePattern()
261            // --------------------------------------------------------------------
262    
263            if (result == source) {
264                // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
265                // and the result is not failed
266                if (result.getPattern() == ExchangePattern.InOptionalOut) {
267                    // keep as is
268                } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
269                    // copy IN to OUT as we expect a OUT response
270                    result.getOut().copyFrom(source.getIn());
271                }
272                return;
273            }
274    
275            if (result != source) {
276                result.setException(source.getException());
277                if (source.hasOut()) {
278                    result.getOut().copyFrom(source.getOut());
279                } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
280                    // special case where the result is InOptionalOut and with no OUT response
281                    // so we should return null to indicate this fact
282                    result.setOut(null);
283                } else {
284                    // no results so lets copy the last input
285                    // as the final processor on a pipeline might not
286                    // have created any OUT; such as a mock:endpoint
287                    // so lets assume the last IN is the OUT
288                    if (result.getPattern().isOutCapable()) {
289                        // only set OUT if its OUT capable
290                        result.getOut().copyFrom(source.getIn());
291                    } else {
292                        // if not replace IN instead to keep the MEP
293                        result.getIn().copyFrom(source.getIn());
294                        // clear any existing OUT as the result is on the IN
295                        if (result.hasOut()) {
296                            result.setOut(null);
297                        }
298                    }
299                }
300    
301                if (source.hasProperties()) {
302                    result.getProperties().putAll(source.getProperties());
303                }
304            }
305        }
306    
307        /**
308         * Copies the <code>source</code> exchange to <code>target</code> exchange
309         * preserving the {@link ExchangePattern} of <code>target</code>.
310         *
311         * @param source source exchange.
312         * @param result target exchange.
313         */
314        public static void copyResultsPreservePattern(Exchange result, Exchange source) {
315    
316            // --------------------------------------------------------------------
317            //  TODO: merge logic with that of copyResults()
318            // --------------------------------------------------------------------
319    
320            if (result == source) {
321                // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
322                // and the result is not failed
323                if (result.getPattern() == ExchangePattern.InOptionalOut) {
324                    // keep as is
325                } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
326                    // copy IN to OUT as we expect a OUT response
327                    result.getOut().copyFrom(source.getIn());
328                }
329                return;
330            }
331    
332            // copy in message
333            result.getIn().copyFrom(source.getIn());
334    
335            // copy out message
336            if (source.hasOut()) {
337                // exchange pattern sensitive
338                Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
339                resultMessage.copyFrom(source.getOut());
340            }
341    
342            // copy exception
343            result.setException(source.getException());
344    
345            // copy properties
346            if (source.hasProperties()) {
347                result.getProperties().putAll(source.getProperties());
348            }
349        }
350    
351        /**
352         * Returns the message where to write results in an
353         * exchange-pattern-sensitive way.
354         *
355         * @param exchange message exchange.
356         * @return result message.
357         */
358        public static Message getResultMessage(Exchange exchange) {
359            if (exchange.getPattern().isOutCapable()) {
360                return exchange.getOut();
361            } else {
362                return exchange.getIn();
363            }
364        }
365    
366        /**
367         * Returns true if the given exchange pattern (if defined) can support OUT messages
368         *
369         * @param exchange the exchange to interrogate
370         * @return true if the exchange is defined as an {@link ExchangePattern} which supports
371         *         OUT messages
372         */
373        public static boolean isOutCapable(Exchange exchange) {
374            ExchangePattern pattern = exchange.getPattern();
375            return pattern != null && pattern.isOutCapable();
376        }
377    
378        /**
379         * Creates a new instance of the given type from the injector
380         *
381         * @param exchange the exchange
382         * @param type     the given type
383         * @return the created instance of the given type
384         */
385        public static <T> T newInstance(Exchange exchange, Class<T> type) {
386            return exchange.getContext().getInjector().newInstance(type);
387        }
388    
389        /**
390         * Creates a Map of the variables which are made available to a script or template
391         *
392         * @param exchange the exchange to make available
393         * @return a Map populated with the require variables
394         */
395        public static Map<String, Object> createVariableMap(Exchange exchange) {
396            Map<String, Object> answer = new HashMap<String, Object>();
397            populateVariableMap(exchange, answer);
398            return answer;
399        }
400    
401        /**
402         * Populates the Map with the variables which are made available to a script or template
403         *
404         * @param exchange the exchange to make available
405         * @param map      the map to populate
406         */
407        public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
408            map.put("exchange", exchange);
409            Message in = exchange.getIn();
410            map.put("in", in);
411            map.put("request", in);
412            map.put("headers", in.getHeaders());
413            map.put("body", in.getBody());
414            if (isOutCapable(exchange)) {
415                // if we are out capable then set out and response as well
416                // however only grab OUT if it exists, otherwise reuse IN
417                // this prevents side effects to alter the Exchange if we force creating an OUT message
418                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
419                map.put("out", msg);
420                map.put("response", msg);
421            }
422            map.put("camelContext", exchange.getContext());
423        }
424    
425        /**
426         * Returns the MIME content type on the input message or null if one is not defined
427         *
428         * @param exchange the exchange
429         * @return the MIME content type
430         */
431        public static String getContentType(Exchange exchange) {
432            return MessageHelper.getContentType(exchange.getIn());
433        }
434    
435        /**
436         * Returns the MIME content encoding on the input message or null if one is not defined
437         *
438         * @param exchange the exchange
439         * @return the MIME content encoding
440         */
441        public static String getContentEncoding(Exchange exchange) {
442            return MessageHelper.getContentEncoding(exchange.getIn());
443        }
444    
445        /**
446         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
447         *
448         * @param exchange the exchange
449         * @param name     the bean name
450         * @return the bean
451         * @throws NoSuchBeanException if no bean could be found in the registry
452         */
453        public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
454            Object value = lookupBean(exchange, name);
455            if (value == null) {
456                throw new NoSuchBeanException(name);
457            }
458            return value;
459        }
460    
461        /**
462         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
463         *
464         * @param exchange the exchange
465         * @param name     the bean name
466         * @param type     the expected bean type
467         * @return the bean
468         * @throws NoSuchBeanException if no bean could be found in the registry
469         */
470        public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
471            T value = lookupBean(exchange, name, type);
472            if (value == null) {
473                throw new NoSuchBeanException(name);
474            }
475            return value;
476        }
477    
478        /**
479         * Performs a lookup in the registry of the bean name
480         *
481         * @param exchange the exchange
482         * @param name     the bean name
483         * @return the bean, or <tt>null</tt> if no bean could be found
484         */
485        public static Object lookupBean(Exchange exchange, String name) {
486            return exchange.getContext().getRegistry().lookupByName(name);
487        }
488    
489        /**
490         * Performs a lookup in the registry of the bean name and type
491         *
492         * @param exchange the exchange
493         * @param name     the bean name
494         * @param type     the expected bean type
495         * @return the bean, or <tt>null</tt> if no bean could be found
496         */
497        public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
498            return exchange.getContext().getRegistry().lookupByNameAndType(name, type);
499        }
500    
501        /**
502         * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
503         * or null if none could be found
504         *
505         * @param exchanges  the exchanges
506         * @param exchangeId the exchangeId to find
507         * @return matching exchange, or <tt>null</tt> if none found
508         */
509        public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
510            for (Exchange exchange : exchanges) {
511                String id = exchange.getExchangeId();
512                if (id != null && id.equals(exchangeId)) {
513                    return exchange;
514                }
515            }
516            return null;
517        }
518    
519        /**
520         * Prepares the exchanges for aggregation.
521         * <p/>
522         * This implementation will copy the OUT body to the IN body so when you do
523         * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
524         *
525         * @param oldExchange the old exchange
526         * @param newExchange the new exchange
527         */
528        public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
529            // move body/header from OUT to IN
530            if (oldExchange != null) {
531                if (oldExchange.hasOut()) {
532                    oldExchange.setIn(oldExchange.getOut());
533                    oldExchange.setOut(null);
534                }
535            }
536    
537            if (newExchange != null) {
538                if (newExchange.hasOut()) {
539                    newExchange.setIn(newExchange.getOut());
540                    newExchange.setOut(null);
541                }
542            }
543        }
544    
545        /**
546         * Checks whether the exchange has been failure handed
547         *
548         * @param exchange  the exchange
549         * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
550         */
551        public static boolean isFailureHandled(Exchange exchange) {
552            return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
553        }
554    
555        /**
556         * Checks whether the exchange {@link UnitOfWork} is exhausted
557         *
558         * @param exchange  the exchange
559         * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
560         */
561        public static boolean isUnitOfWorkExhausted(Exchange exchange) {
562            return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
563        }
564    
565        /**
566         * Sets the exchange to be failure handled.
567         *
568         * @param exchange  the exchange
569         */
570        public static void setFailureHandled(Exchange exchange) {
571            exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
572            // clear exception since its failure handled
573            exchange.setException(null);
574        }
575    
576        /**
577         * Checks whether the exchange is redelivery exhausted
578         *
579         * @param exchange  the exchange
580         * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
581         */
582        public static boolean isRedeliveryExhausted(Exchange exchange) {
583            return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
584        }
585    
586        /**
587         * Checks whether the exchange {@link UnitOfWork} is redelivered
588         *
589         * @param exchange  the exchange
590         * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
591         */
592        public static boolean isRedelivered(Exchange exchange) {
593            return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
594        }
595    
596        /**
597         * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
598         *
599         * @param exchange  the exchange
600         * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
601         */
602        public static boolean isInterrupted(Exchange exchange) {
603            return exchange.getException(InterruptedException.class) != null;
604        }
605    
606        /**
607         * Extracts the body from the given exchange.
608         * <p/>
609         * If the exchange pattern is provided it will try to honor it and retrieve the body
610         * from either IN or OUT according to the pattern.
611         *
612         * @param exchange the exchange
613         * @param pattern  exchange pattern if given, can be <tt>null</tt>
614         * @return the result body, can be <tt>null</tt>.
615         * @throws CamelExecutionException is thrown if the processing of the exchange failed
616         */
617        public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
618            Object answer = null;
619            if (exchange != null) {
620                // rethrow if there was an exception during execution
621                if (exchange.getException() != null) {
622                    throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
623                }
624    
625                // result could have a fault message
626                if (hasFaultMessage(exchange)) {
627                    return exchange.getOut().getBody();
628                }
629    
630                // okay no fault then return the response according to the pattern
631                // try to honor pattern if provided
632                boolean notOut = pattern != null && !pattern.isOutCapable();
633                boolean hasOut = exchange.hasOut();
634                if (hasOut && !notOut) {
635                    // we have a response in out and the pattern is out capable
636                    answer = exchange.getOut().getBody();
637                } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
638                    // special case where the result is InOptionalOut and with no OUT response
639                    // so we should return null to indicate this fact
640                    answer = null;
641                } else {
642                    // use IN as the response
643                    answer = exchange.getIn().getBody();
644                }
645            }
646            return answer;
647        }
648    
649        /**
650         * Tests whether the exchange has a fault message set and that its not null.
651         *
652         * @param exchange the exchange
653         * @return <tt>true</tt> if fault message exists
654         */
655        public static boolean hasFaultMessage(Exchange exchange) {
656            return exchange.hasOut() && exchange.getOut().isFault() && exchange.getOut().getBody() != null;
657        }
658    
659        /**
660         * Tests whether the exchange has already been handled by the error handler
661         *
662         * @param exchange the exchange
663         * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
664         */
665        public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
666            return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
667        }
668    
669        /**
670         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
671         * <p/>
672         * Will wait until the future task is complete.
673         *
674         * @param context the camel context
675         * @param future  the future handle
676         * @param type    the expected body response type
677         * @return the result body, can be <tt>null</tt>.
678         * @throws CamelExecutionException is thrown if the processing of the exchange failed
679         */
680        public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) {
681            try {
682                return doExtractFutureBody(context, future.get(), type);
683            } catch (InterruptedException e) {
684                throw ObjectHelper.wrapRuntimeCamelException(e);
685            } catch (ExecutionException e) {
686                // execution failed due to an exception so rethrow the cause
687                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
688            } finally {
689                // its harmless to cancel if task is already completed
690                // and in any case we do not want to get hold of the task a 2nd time
691                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
692                future.cancel(true);
693            }
694        }
695    
696        /**
697         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
698         * <p/>
699         * Will wait for the future task to complete, but waiting at most the timeout value.
700         *
701         * @param context the camel context
702         * @param future  the future handle
703         * @param timeout timeout value
704         * @param unit    timeout unit
705         * @param type    the expected body response type
706         * @return the result body, can be <tt>null</tt>.
707         * @throws CamelExecutionException is thrown if the processing of the exchange failed
708         * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
709         */
710        public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
711            try {
712                if (timeout > 0) {
713                    return doExtractFutureBody(context, future.get(timeout, unit), type);
714                } else {
715                    return doExtractFutureBody(context, future.get(), type);
716                }
717            } catch (InterruptedException e) {
718                // execution failed due interruption so rethrow the cause
719                throw ObjectHelper.wrapCamelExecutionException(null, e);
720            } catch (ExecutionException e) {
721                // execution failed due to an exception so rethrow the cause
722                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
723            } finally {
724                // its harmless to cancel if task is already completed
725                // and in any case we do not want to get hold of the task a 2nd time
726                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
727                future.cancel(true);
728            }
729        }
730    
731        private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
732            if (result == null) {
733                return null;
734            }
735            if (type.isAssignableFrom(result.getClass())) {
736                return type.cast(result);
737            }
738            if (result instanceof Exchange) {
739                Exchange exchange = (Exchange) result;
740                Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
741                return context.getTypeConverter().convertTo(type, exchange, answer);
742            }
743            return context.getTypeConverter().convertTo(type, result);
744        }
745    
746        /**
747         * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead
748         */
749        @Deprecated
750        public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
751            return CamelExchangeException.createExceptionMessage(message, exchange, cause);
752        }
753    
754        /**
755         * Strategy to prepare results before next iterator or when we are complete,
756         * which is done by copying OUT to IN, so there is only an IN as input
757         * for the next iteration.
758         *
759         * @param exchange the exchange to prepare
760         */
761        public static void prepareOutToIn(Exchange exchange) {
762            // we are routing using pipes and filters so we need to manually copy OUT to IN
763            if (exchange.hasOut()) {
764                exchange.getIn().copyFrom(exchange.getOut());
765                exchange.setOut(null);
766            }
767        }
768    
769        /**
770         * Gets both the messageId and exchangeId to be used for logging purposes.
771         * <p/>
772         * Logging both ids, can help to correlate exchanges which may be redelivered messages
773         * from for example a JMS broker.
774         *
775         * @param exchange the exchange
776         * @return a log message with both the messageId and exchangeId
777         */
778        public static String logIds(Exchange exchange) {
779            String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
780            return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId()  + ")";
781        }
782    
783        /**
784         * Copies the exchange but the copy will be tied to the given context
785         *
786         * @param exchange  the source exchange
787         * @param context   the camel context
788         * @return a copy with the given camel context
789         */
790        public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
791            return copyExchangeAndSetCamelContext(exchange, context, true);
792        }
793    
794        /**
795         * Copies the exchange but the copy will be tied to the given context
796         *
797         * @param exchange  the source exchange
798         * @param context   the camel context
799         * @param handover  whether to handover on completions from the source to the copy
800         * @return a copy with the given camel context
801         */
802        public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
803            DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
804            if (exchange.hasProperties()) {
805                answer.setProperties(safeCopy(exchange.getProperties()));
806            }
807            if (handover) {
808                // Need to hand over the completion for async invocation
809                exchange.handoverCompletions(answer);
810            }
811            answer.setIn(exchange.getIn().copy());
812            if (exchange.hasOut()) {
813                answer.setOut(exchange.getOut().copy());
814            }
815            answer.setException(exchange.getException());
816            return answer;
817        }
818    
819        @SuppressWarnings("unchecked")
820        private static Map<String, Object> safeCopy(Map<String, Object> properties) {
821            if (properties == null) {
822                return null;
823            }
824    
825            Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties);
826    
827            // safe copy message history using a defensive copy
828            List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
829            if (history != null) {
830                answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
831            }
832    
833            return answer;
834        }
835    }