001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.loadbalancer;
018    
019    import java.util.List;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.atomic.AtomicInteger;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Processor;
029    import org.apache.camel.Traceable;
030    import org.apache.camel.util.AsyncProcessorConverterHelper;
031    import org.apache.camel.util.ExchangeHelper;
032    import org.apache.camel.util.ObjectHelper;
033    
034    /**
035     * This FailOverLoadBalancer will failover to use next processor when an exception occurred
036     * <p/>
037     * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
038     * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
039     * pipeline to ensure it works the same and the async routing engine is flawless.
040     */
041    public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware {
042    
043        private final List<Class<?>> exceptions;
044        private CamelContext camelContext;
045        private boolean roundRobin;
046        private int maximumFailoverAttempts = -1;
047    
048        // stateful counter
049        private final AtomicInteger counter = new AtomicInteger(-1);
050    
051        public FailOverLoadBalancer() {
052            this.exceptions = null;
053        }
054    
055        public FailOverLoadBalancer(List<Class<?>> exceptions) {
056            this.exceptions = exceptions;
057    
058            // validate its all exception types
059            for (Class<?> type : exceptions) {
060                if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
061                    throw new IllegalArgumentException("Class is not an instance of Throwable: " + type);
062                }
063            }
064        }
065    
066        @Override
067        public CamelContext getCamelContext() {
068            return camelContext;
069        }
070    
071        @Override
072        public void setCamelContext(CamelContext camelContext) {
073            this.camelContext = camelContext;
074        }
075    
076        public List<Class<?>> getExceptions() {
077            return exceptions;
078        }
079    
080        public boolean isRoundRobin() {
081            return roundRobin;
082        }
083    
084        public void setRoundRobin(boolean roundRobin) {
085            this.roundRobin = roundRobin;
086        }
087    
088        public int getMaximumFailoverAttempts() {
089            return maximumFailoverAttempts;
090        }
091    
092        public void setMaximumFailoverAttempts(int maximumFailoverAttempts) {
093            this.maximumFailoverAttempts = maximumFailoverAttempts;
094        }
095    
096        /**
097         * Should the given failed Exchange failover?
098         *
099         * @param exchange the exchange that failed
100         * @return <tt>true</tt> to failover
101         */
102        protected boolean shouldFailOver(Exchange exchange) {
103            if (exchange == null) {
104                return false;
105            }
106    
107            boolean answer = false;
108    
109            if (exchange.getException() != null) {
110                if (exceptions == null || exceptions.isEmpty()) {
111                    // always failover if no exceptions defined
112                    answer = true;
113                } else {
114                    for (Class<?> exception : exceptions) {
115                        // will look in exception hierarchy
116                        if (exchange.getException(exception) != null) {
117                            answer = true;
118                            break;
119                        }
120                    }
121                }
122            }
123    
124            log.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId());
125    
126            return answer;
127        }
128    
129        @Override
130        public boolean isRunAllowed() {
131            // determine if we can still run, or the camel context is forcing a shutdown
132            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
133            if (forceShutdown) {
134                log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
135            }
136            return !forceShutdown && super.isRunAllowed();
137        }
138    
139        public boolean process(final Exchange exchange, final AsyncCallback callback) {
140            final List<Processor> processors = getProcessors();
141    
142            final AtomicInteger index = new AtomicInteger();
143            final AtomicInteger attempts = new AtomicInteger();
144            boolean first = true;
145            // use a copy of the original exchange before failover to avoid populating side effects
146            // directly into the original exchange
147            Exchange copy = null;
148    
149            // get the next processor
150            if (isRoundRobin()) {
151                if (counter.incrementAndGet() >= processors.size()) {
152                    counter.set(0);
153                }
154                index.set(counter.get());
155            }
156            log.trace("Failover starting with endpoint index {}", index);
157    
158            while (first || shouldFailOver(copy)) {
159    
160                // can we still run
161                if (!isRunAllowed()) {
162                    log.trace("Run not allowed, will reject executing exchange: {}", exchange);
163                    if (exchange.getException() == null) {
164                        exchange.setException(new RejectedExecutionException());
165                    }
166                    // we cannot process so invoke callback
167                    callback.done(true);
168                    return true;
169                }
170    
171                if (!first) {
172                    attempts.incrementAndGet();
173                    // are we exhausted by attempts?
174                    if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
175                        log.debug("Breaking out of failover after {} failover attempts", attempts);
176                        break;
177                    }
178    
179                    index.incrementAndGet();
180                    counter.incrementAndGet();
181                } else {
182                    // flip first switch
183                    first = false;
184                }
185    
186                if (index.get() >= processors.size()) {
187                    // out of bounds
188                    if (isRoundRobin()) {
189                        log.trace("Failover is round robin enabled and therefore starting from the first endpoint");
190                        index.set(0);
191                        counter.set(0);
192                    } else {
193                        // no more processors to try
194                        log.trace("Breaking out of failover as we reached the end of endpoints to use for failover");
195                        break;
196                    }
197                }
198    
199                // try again but copy original exchange before we failover
200                copy = prepareExchangeForFailover(exchange);
201                Processor processor = processors.get(index.get());
202    
203                // process the exchange
204                boolean sync = processExchange(processor, exchange, copy, attempts, index, callback, processors);
205    
206                // continue as long its being processed synchronously
207                if (!sync) {
208                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
209                    // the remainder of the failover will be completed async
210                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
211                    return false;
212                }
213    
214                log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
215            }
216    
217            // and copy the current result to original so it will contain this result of this eip
218            if (copy != null) {
219                ExchangeHelper.copyResults(exchange, copy);
220            }
221            log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
222            callback.done(true);
223            return true;
224        }
225    
226        /**
227         * Prepares the exchange for failover
228         *
229         * @param exchange the exchange
230         * @return a copy of the exchange to use for failover
231         */
232        protected Exchange prepareExchangeForFailover(Exchange exchange) {
233            // use a copy of the exchange to avoid side effects on the original exchange
234            return ExchangeHelper.createCopy(exchange, true);
235        }
236    
237        private boolean processExchange(Processor processor, Exchange exchange, Exchange copy,
238                                        AtomicInteger attempts, AtomicInteger index,
239                                        AsyncCallback callback, List<Processor> processors) {
240            if (processor == null) {
241                throw new IllegalStateException("No processors could be chosen to process " + copy);
242            }
243            log.debug("Processing failover at attempt {} for {}", attempts, copy);
244    
245            AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
246            return albp.process(copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
247        }
248    
249        /**
250         * Failover logic to be executed asynchronously if one of the failover endpoints
251         * is a real {@link AsyncProcessor}.
252         */
253        private final class FailOverAsyncCallback implements AsyncCallback {
254    
255            private final Exchange exchange;
256            private Exchange copy;
257            private final AtomicInteger attempts;
258            private final AtomicInteger index;
259            private final AsyncCallback callback;
260            private final List<Processor> processors;
261    
262            private FailOverAsyncCallback(Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) {
263                this.exchange = exchange;
264                this.copy = copy;
265                this.attempts = attempts;
266                this.index = index;
267                this.callback = callback;
268                this.processors = processors;
269            }
270    
271            public void done(boolean doneSync) {
272                // we only have to handle async completion of the pipeline
273                if (doneSync) {
274                    return;
275                }
276    
277                while (shouldFailOver(copy)) {
278    
279                    // can we still run
280                    if (!isRunAllowed()) {
281                        log.trace("Run not allowed, will reject executing exchange: {}", exchange);
282                        if (exchange.getException() == null) {
283                            exchange.setException(new RejectedExecutionException());
284                        }
285                        // we cannot process so invoke callback
286                        callback.done(false);
287                    }
288    
289                    attempts.incrementAndGet();
290                    // are we exhausted by attempts?
291                    if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
292                        log.trace("Breaking out of failover after {} failover attempts", attempts);
293                        break;
294                    }
295    
296                    index.incrementAndGet();
297                    counter.incrementAndGet();
298    
299                    if (index.get() >= processors.size()) {
300                        // out of bounds
301                        if (isRoundRobin()) {
302                            log.trace("Failover is round robin enabled and therefore starting from the first endpoint");
303                            index.set(0);
304                            counter.set(0);
305                        } else {
306                            // no more processors to try
307                            log.trace("Breaking out of failover as we reached the end of endpoints to use for failover");
308                            break;
309                        }
310                    }
311    
312                    // try again but prepare exchange before we failover
313                    copy = prepareExchangeForFailover(exchange);
314                    Processor processor = processors.get(index.get());
315    
316                    // try to failover using the next processor
317                    doneSync = processExchange(processor, exchange, copy, attempts, index, callback, processors);
318                    if (!doneSync) {
319                        log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
320                        // the remainder of the failover will be completed async
321                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
322                        return;
323                    }
324                }
325    
326                // and copy the current result to original so it will contain this result of this eip
327                if (copy != null) {
328                    ExchangeHelper.copyResults(exchange, copy);
329                }
330                log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
331                // signal callback we are done
332                callback.done(false);
333            };
334        }
335    
336        public String toString() {
337            return "FailoverLoadBalancer[" + getProcessors() + "]";
338        }
339    
340        public String getTraceLabel() {
341            return "failover";
342        }
343    }