001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Predicate;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Traceable;
027    import org.apache.camel.util.ExchangeHelper;
028    import org.apache.camel.util.ObjectHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A processor which catches exceptions.
034     *
035     * @version 
036     */
037    public class CatchProcessor extends DelegateAsyncProcessor implements Traceable {
038        private static final Logger LOG = LoggerFactory.getLogger(CatchProcessor.class);
039    
040        private final List<Class<? extends Throwable>> exceptions;
041        private final Predicate onWhen;
042        private final Predicate handled;
043    
044        public CatchProcessor(List<Class<? extends Throwable>> exceptions, Processor processor, Predicate onWhen, Predicate handled) {
045            super(processor);
046            this.exceptions = exceptions;
047            this.onWhen = onWhen;
048            this.handled = handled;
049        }
050    
051        @Override
052        public String toString() {
053            return "Catch[" + exceptions + " -> " + getProcessor() + "]";
054        }
055    
056        public String getTraceLabel() {
057            return "catch";
058        }
059    
060        @Override
061        public boolean process(final Exchange exchange, final AsyncCallback callback) {
062            Exception e = exchange.getException();
063            Throwable caught = catches(exchange, e);
064            // If a previous catch clause handled the exception or if this clause does not match, exit
065            if (exchange.getProperty(Exchange.EXCEPTION_HANDLED) != null || caught == null) {
066                callback.done(true);
067                return true;
068            }
069            if (LOG.isTraceEnabled()) {
070                LOG.trace("This CatchProcessor catches the exception: {} caused by: {}", caught.getClass().getName(), e.getMessage());
071            }
072    
073            // store the last to endpoint as the failure endpoint
074            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
075                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
076            }
077            // give the rest of the pipeline another chance
078            exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
079            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
080            exchange.setException(null);
081            // and we should not be regarded as exhausted as we are in a try .. catch block
082            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
083    
084            // is the exception handled by the catch clause
085            final boolean handled = handles(exchange);
086    
087            if (LOG.isDebugEnabled()) {
088                LOG.debug("The exception is handled: {} for the exception: {} caused by: {}",
089                        new Object[]{handled, e.getClass().getName(), e.getMessage()});
090            }
091    
092            boolean sync = processor.process(exchange, new AsyncCallback() {
093                public void done(boolean doneSync) {
094                    if (!handled) {
095                        if (exchange.getException() == null) {
096                            exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
097                        }
098                    }
099                    // always clear redelivery exhausted in a catch clause
100                    exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
101    
102                    if (!doneSync) {
103                        // signal callback to continue routing async
104                        ExchangeHelper.prepareOutToIn(exchange);
105                    }
106    
107                    callback.done(doneSync);
108                }
109            });
110    
111            return sync;
112        }
113    
114        /**
115         * Returns with the exception that is caught by this processor.
116         *
117         * This method traverses exception causes, so sometimes the exception
118         * returned from this method might be one of causes of the parameter
119         * passed.
120         *
121         * @param exchange  the current exchange
122         * @param exception the thrown exception
123         * @return Throwable that this processor catches. <tt>null</tt> if nothing matches.
124         */
125        protected Throwable catches(Exchange exchange, Throwable exception) {
126            // use the exception iterator to walk the caused by hierarchy
127            Iterator<Throwable> it = ObjectHelper.createExceptionIterator(exception);
128            while (it.hasNext()) {
129                Throwable e = it.next();
130                // see if we catch this type
131                for (Class<?> type : exceptions) {
132                    if (type.isInstance(e) && matchesWhen(exchange)) {
133                        return e;
134                    }
135                }
136            }
137    
138            // not found
139            return null;
140        }
141    
142        /**
143         * Whether this catch processor handles the exception it have caught
144         *
145         * @param exchange  the current exchange
146         * @return <tt>true</tt> if this processor handles it, <tt>false</tt> otherwise.
147         */
148        protected boolean handles(Exchange exchange) {
149            if (handled == null) {
150                // handle by default
151                return true;
152            }
153    
154            return handled.matches(exchange);
155        }
156    
157        public List<Class<? extends Throwable>> getExceptions() {
158            return exceptions;
159        }
160    
161        /**
162         * Strategy method for matching the exception type with the current exchange.
163         * <p/>
164         * This default implementation will match as:
165         * <ul>
166         *   <li>Always true if no when predicate on the exception type
167         *   <li>Otherwise the when predicate is matches against the current exchange
168         * </ul>
169         *
170         * @param exchange the current {@link org.apache.camel.Exchange}
171         * @return <tt>true</tt> if matched, <tt>false</tt> otherwise.
172         */
173        protected boolean matchesWhen(Exchange exchange) {
174            if (onWhen == null) {
175                // if no predicate then it's always a match
176                return true;
177            }
178            return onWhen.matches(exchange);
179        }
180    }