001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.io.InputStream;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.CamelContextAware;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Message;
027    import org.apache.camel.RuntimeCamelException;
028    import org.apache.camel.Traceable;
029    import org.apache.camel.spi.DataFormat;
030    import org.apache.camel.support.ServiceSupport;
031    import org.apache.camel.util.AsyncProcessorHelper;
032    import org.apache.camel.util.IOHelper;
033    import org.apache.camel.util.ObjectHelper;
034    import org.apache.camel.util.ServiceHelper;
035    
036    /**
037     * Unmarshals the body of the incoming message using the given
038     * <a href="http://camel.apache.org/data-format.html">data format</a>
039     *
040     * @version 
041     */
042    public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor, Traceable, CamelContextAware {
043        private CamelContext camelContext;
044        private final DataFormat dataFormat;
045    
046        public UnmarshalProcessor(DataFormat dataFormat) {
047            this.dataFormat = dataFormat;
048        }
049    
050        public void process(Exchange exchange) throws Exception {
051            AsyncProcessorHelper.process(this, exchange);
052        }
053    
054        public boolean process(Exchange exchange, AsyncCallback callback) {
055            ObjectHelper.notNull(dataFormat, "dataFormat");
056    
057            InputStream stream = null;
058            try {
059                stream = exchange.getIn().getMandatoryBody(InputStream.class);
060    
061                // lets setup the out message before we invoke the dataFormat so that it can mutate it if necessary
062                Message out = exchange.getOut();
063                out.copyFrom(exchange.getIn());
064    
065                Object result = dataFormat.unmarshal(exchange, stream);
066                if (result instanceof Exchange) {
067                    if (result != exchange) {
068                        // it's not allowed to return another exchange other than the one provided to dataFormat
069                        throw new RuntimeCamelException("The returned exchange " + result + " is not the same as " + exchange + " provided to the DataFormat");
070                    }
071                } else if (result instanceof Message) {
072                    // the dataformat has probably set headers, attachments, etc. so let's use it as the outbound payload
073                    exchange.setOut((Message) result);
074                } else {
075                    out.setBody(result);
076                }
077            } catch (Exception e) {
078                // remove OUT message, as an exception occurred
079                exchange.setOut(null);
080                exchange.setException(e);
081            } finally {
082                IOHelper.close(stream, "input stream");
083            }
084    
085            callback.done(true);
086            return true;
087        }
088    
089        public String toString() {
090            return "Unmarshal[" + dataFormat + "]";
091        }
092    
093        public String getTraceLabel() {
094            return "unmarshal[" + dataFormat + "]";
095        }
096    
097        public CamelContext getCamelContext() {
098            return camelContext;
099        }
100    
101        public void setCamelContext(CamelContext camelContext) {
102            this.camelContext = camelContext;
103        }
104    
105        @Override
106        protected void doStart() throws Exception {
107            // inject CamelContext on data format
108            if (dataFormat instanceof CamelContextAware) {
109                ((CamelContextAware) dataFormat).setCamelContext(camelContext);
110            }
111            ServiceHelper.startService(dataFormat);
112        }
113    
114        @Override
115        protected void doStop() throws Exception {
116            ServiceHelper.stopService(dataFormat);
117        }
118    
119    }