001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Navigate;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Traceable;
029    import org.apache.camel.support.ServiceSupport;
030    import org.apache.camel.util.AsyncProcessorConverterHelper;
031    import org.apache.camel.util.AsyncProcessorHelper;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.ServiceHelper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Implements try/catch/finally type processing
039     *
040     * @version 
041     */
042    public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
043        private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class);
044    
045        protected final Processor tryProcessor;
046        protected final List<Processor> catchClauses;
047        protected final Processor finallyProcessor;
048    
049        public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) {
050            this.tryProcessor = tryProcessor;
051            this.catchClauses = catchClauses;
052            this.finallyProcessor = finallyProcessor;
053        }
054    
055        public String toString() {
056            String catchText = catchClauses == null || catchClauses.isEmpty() ? "" : " Catches {" + catchClauses + "}";
057            String finallyText = (finallyProcessor == null) ? "" : " Finally {" + finallyProcessor + "}";
058            return "Try {" + tryProcessor + "}" + catchText + finallyText;
059        }
060    
061        public String getTraceLabel() {
062            return "doTry";
063        }
064    
065        public void process(Exchange exchange) throws Exception {
066            AsyncProcessorHelper.process(this, exchange);
067        }
068    
069        public boolean process(Exchange exchange, AsyncCallback callback) {
070            Iterator<Processor> processors = next().iterator();
071    
072            Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
073            exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
074    
075            while (continueRouting(processors, exchange)) {
076                exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
077                ExchangeHelper.prepareOutToIn(exchange);
078    
079                // process the next processor
080                Processor processor = processors.next();
081                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
082                boolean sync = process(exchange, callback, processors, async, lastHandled);
083    
084                // continue as long its being processed synchronously
085                if (!sync) {
086                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
087                    // the remainder of the try .. catch .. finally will be completed async
088                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
089                    return false;
090                }
091    
092                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
093            }
094    
095            ExchangeHelper.prepareOutToIn(exchange);
096            exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
097            exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
098            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
099            callback.done(true);
100            return true;
101        }
102    
103        protected boolean process(final Exchange exchange, final AsyncCallback callback,
104                                  final Iterator<Processor> processors, final AsyncProcessor processor,
105                                  final Object lastHandled) {
106            // this does the actual processing so log at trace level
107            LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
108    
109            // implement asynchronous routing logic in callback so we can have the callback being
110            // triggered and then continue routing where we left
111            boolean sync = processor.process(exchange, new AsyncCallback() {
112                public void done(boolean doneSync) {
113                    // we only have to handle async completion of the pipeline
114                    if (doneSync) {
115                        return;
116                    }
117    
118                    // continue processing the try .. catch .. finally asynchronously
119                    while (continueRouting(processors, exchange)) {
120                        exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
121                        ExchangeHelper.prepareOutToIn(exchange);
122    
123                        // process the next processor
124                        AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
125                        doneSync = process(exchange, callback, processors, processor, lastHandled);
126    
127                        if (!doneSync) {
128                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
129                            // the remainder of the try .. catch .. finally will be completed async
130                            // so we break out now, then the callback will be invoked which then continue routing from where we left here
131                            return;
132                        }
133                    }
134    
135                    ExchangeHelper.prepareOutToIn(exchange);
136                    exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
137                    exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
138                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
139                    callback.done(false);
140                }
141            });
142    
143            return sync;
144        }
145    
146        protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
147            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
148            if (stop != null) {
149                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
150                if (doStop) {
151                    LOG.debug("Exchange is marked to stop routing: {}", exchange);
152                    return false;
153                }
154            }
155    
156            // continue if there are more processors to route
157            return it.hasNext();
158        }
159    
160        protected void doStart() throws Exception {
161            ServiceHelper.startServices(tryProcessor, catchClauses, finallyProcessor);
162        }
163    
164        protected void doStop() throws Exception {
165            ServiceHelper.stopServices(tryProcessor, catchClauses, finallyProcessor);
166        }
167    
168        public List<Processor> next() {
169            if (!hasNext()) {
170                return null;
171            }
172            List<Processor> answer = new ArrayList<Processor>();
173            if (tryProcessor != null) {
174                answer.add(tryProcessor);
175            }
176            if (catchClauses != null) {
177                answer.addAll(catchClauses);
178            }
179            if (finallyProcessor != null) {
180                answer.add(finallyProcessor);
181            }
182            return answer;
183        }
184    
185        public boolean hasNext() {
186            return tryProcessor != null || catchClauses != null && !catchClauses.isEmpty() || finallyProcessor != null;
187        }
188    
189    }