001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.io.File;
020    import java.io.Serializable;
021    import java.util.Collection;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    
025    import org.apache.camel.Exchange;
026    import org.apache.camel.RuntimeExchangeException;
027    import org.apache.camel.WrappedFile;
028    import org.apache.camel.util.ObjectHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Holder object for sending an exchange over a remote wire as a serialized object.
034     * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
035     * <p/>
036     * Note: Message body of type {@link File} or {@link WrappedFile} is <b>not</b> supported and
037     * a {@link RuntimeExchangeException} is thrown.
038     * <p/>
039     * As opposed to normal usage where only the body part of the exchange is transferred over the wire,
040     * this holder object serializes the following fields over the wire:
041     * <ul>
042     * <li>exchangeId</li>
043     * <li>in body</li>
044     * <li>out body</li>
045     * <li>in headers</li>
046     * <li>out headers</li>
047     * <li>fault body </li>
048     * <li>fault headers</li>
049     * <li>exchange properties</li>
050     * <li>exception</li>
051     * </ul>
052     * Any object that is not serializable will be skipped and Camel will log this at WARN level.
053     *
054     * @version 
055     */
056    public class DefaultExchangeHolder implements Serializable {
057    
058        private static final long serialVersionUID = 2L;
059        private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeHolder.class);
060    
061        private String exchangeId;
062        private Object inBody;
063        private Object outBody;
064        private Boolean outFaultFlag = Boolean.FALSE;
065        private Map<String, Object> inHeaders;
066        private Map<String, Object> outHeaders;
067        private Map<String, Object> properties;
068        private Exception exception;
069    
070        /**
071         * Creates a payload object with the information from the given exchange.
072         *
073         * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
074         * @return the holder object with information copied form the exchange
075         */
076        public static DefaultExchangeHolder marshal(Exchange exchange) {
077            return marshal(exchange, true);
078        }
079    
080        /**
081         * Creates a payload object with the information from the given exchange.
082         *
083         * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
084         * @param includeProperties whether or not to include exchange properties
085         * @return the holder object with information copied form the exchange
086         */
087        public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
088            ObjectHelper.notNull(exchange, "exchange");
089    
090            // we do not support files
091            Object body = exchange.getIn().getBody();
092            if (body instanceof WrappedFile || body instanceof File) {
093                throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange);
094            }
095    
096            DefaultExchangeHolder payload = new DefaultExchangeHolder();
097    
098            payload.exchangeId = exchange.getExchangeId();
099            payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
100            payload.safeSetInHeaders(exchange);
101            if (exchange.hasOut()) {
102                payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody());
103                payload.outFaultFlag = exchange.getOut().isFault();
104                payload.safeSetOutHeaders(exchange);
105            }
106            if (includeProperties) {
107                payload.safeSetProperties(exchange);
108            }
109            payload.exception = exchange.getException();
110    
111            return payload;
112        }
113    
114        /**
115         * Transfers the information from the payload to the exchange.
116         *
117         * @param exchange the exchange to set values from the payload, must <b>not</b> be <tt>null</tt>
118         * @param payload  the payload with the values, must <b>not</b> be <tt>null</tt>
119         */
120        public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
121            ObjectHelper.notNull(exchange, "exchange");
122            ObjectHelper.notNull(payload, "payload");
123    
124            exchange.setExchangeId(payload.exchangeId);
125            exchange.getIn().setBody(payload.inBody);
126            if (payload.inHeaders != null) {
127                exchange.getIn().setHeaders(payload.inHeaders);
128            }
129            if (payload.outBody != null) {
130                exchange.getOut().setBody(payload.outBody);
131                if (payload.outHeaders != null) {
132                    exchange.getOut().setHeaders(payload.outHeaders);
133                }
134                if (payload.outFaultFlag != null) {
135                    exchange.getOut().setFault(payload.outFaultFlag);
136                }
137            }
138            if (payload.properties != null) {
139                for (String key : payload.properties.keySet()) {
140                    exchange.setProperty(key, payload.properties.get(key));
141                }
142            }
143            exchange.setException(payload.exception);
144        }
145    
146        /**
147         * Adds a property to the payload.
148         * <p/>
149         * This can be done in special situations where additional information must be added which was not provided
150         * from the source.
151         *
152         * @param payload the serialized payload
153         * @param key the property key to add
154         * @param property the property value to add
155         */
156        public static void addProperty(DefaultExchangeHolder payload, String key, Serializable property) {
157            if (key == null || property == null) {
158                return;
159            }
160            if (payload.properties == null) {
161                payload.properties = new LinkedHashMap<String, Object>();
162            }
163            payload.properties.put(key, property);
164        }
165    
166        public String toString() {
167            StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId);
168            sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
169            sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
170            sb.append(", properties=").append(properties).append(", exception=").append(exception);
171            return sb.append(']').toString();
172        }
173    
174        private Map<String, Object> safeSetInHeaders(Exchange exchange) {
175            if (exchange.getIn().hasHeaders()) {
176                Map<String, Object> map = checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders());
177                if (map != null && !map.isEmpty()) {
178                    inHeaders = new LinkedHashMap<String, Object>(map);
179                }
180            }
181            return null;
182        }
183    
184        private Map<String, Object> safeSetOutHeaders(Exchange exchange) {
185            if (exchange.hasOut() && exchange.getOut().hasHeaders()) {
186                Map<String, Object> map = checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders());
187                if (map != null && !map.isEmpty()) {
188                    outHeaders = new LinkedHashMap<String, Object>(map);
189                }
190            }
191            return null;
192        }
193    
194        private Map<String, Object> safeSetProperties(Exchange exchange) {
195            if (exchange.hasProperties()) {
196                Map<String, Object> map = checkMapSerializableObjects("properties", exchange, exchange.getProperties());
197                if (map != null && !map.isEmpty()) {
198                    properties = new LinkedHashMap<String, Object>(map);
199                }
200            }
201            return null;
202        }
203    
204        private static Object checkSerializableBody(String type, Exchange exchange, Object object) {
205            if (object == null) {
206                return null;
207            }
208    
209            Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object);
210            if (converted != null) {
211                return converted;
212            } else {
213                LOG.warn("Exchange " + type + " containing object: " + object + " of type: " + object.getClass().getCanonicalName() + " cannot be serialized, it will be excluded by the holder.");
214                return null;
215            }
216        }
217    
218        private static Map<String, Object> checkMapSerializableObjects(String type, Exchange exchange, Map<String, Object> map) {
219            if (map == null) {
220                return null;
221            }
222    
223            Map<String, Object> result = new LinkedHashMap<String, Object>();
224            for (Map.Entry<String, Object> entry : map.entrySet()) {
225    
226                // silently skip any values which is null
227                if (entry.getValue() != null) {
228                    Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, entry.getValue());
229    
230                    // if the converter is a map/collection we need to check its content as well
231                    if (converted instanceof Collection) {
232                        Collection<?> valueCol = (Collection<?>) converted;
233                        if (!collectionContainsAllSerializableObjects(valueCol, exchange)) {
234                            logCannotSerializeObject(type, entry.getKey(), entry.getValue());
235                            continue;
236                        }
237                    } else if (converted instanceof Map) {
238                        Map<?, ?> valueMap = (Map<?, ?>) converted;
239                        if (!mapContainsAllSerializableObjects(valueMap, exchange)) {
240                            logCannotSerializeObject(type, entry.getKey(), entry.getValue());
241                            continue;
242                        }
243                    }
244    
245                    if (converted != null) {
246                        result.put(entry.getKey(), converted);
247                    } else {
248                        logCannotSerializeObject(type, entry.getKey(), entry.getValue());
249                    }
250                }
251            }
252    
253            return result;
254        }
255    
256        private static void logCannotSerializeObject(String type, String key, Object value) {
257            if (key.startsWith("Camel")) {
258                // log Camel at DEBUG level
259                if (LOG.isDebugEnabled()) {
260                    LOG.debug("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder."
261                              , new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
262                }
263            } else {
264                // log regular at WARN level
265                LOG.warn("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder."
266                         , new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
267            }
268        }
269    
270        private static boolean collectionContainsAllSerializableObjects(Collection<?> col, Exchange exchange) {
271            for (Object value : col) {
272                if (value != null) {
273                    Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value);
274                    if (converted == null) {
275                        return false;
276                    }
277                }
278            }
279            return true;
280        }
281    
282        private static boolean mapContainsAllSerializableObjects(Map<?, ?> map, Exchange exchange) {
283            for (Object value : map.values()) {
284                if (value != null) {
285                    Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value);
286                    if (converted == null) {
287                        return false;
288                    }
289                }
290            }
291            return true;
292        }
293    
294    }