001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Iterator;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.Expression;
029    import org.apache.camel.FailedToCreateProducerException;
030    import org.apache.camel.Message;
031    import org.apache.camel.Producer;
032    import org.apache.camel.Traceable;
033    import org.apache.camel.builder.ExpressionBuilder;
034    import org.apache.camel.impl.DefaultExchange;
035    import org.apache.camel.impl.ProducerCache;
036    import org.apache.camel.support.ServiceSupport;
037    import org.apache.camel.util.AsyncProcessorHelper;
038    import org.apache.camel.util.ExchangeHelper;
039    import org.apache.camel.util.MessageHelper;
040    import org.apache.camel.util.ObjectHelper;
041    import org.apache.camel.util.ServiceHelper;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    import static org.apache.camel.processor.PipelineHelper.continueProcessing;
046    import static org.apache.camel.util.ObjectHelper.notNull;
047    
048    /**
049     * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
050     * pattern where the list of actual endpoints to send a message exchange to are
051     * dependent on the value of a message header.
052     * <p/>
053     * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
054     * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
055     * pipeline to ensure it works the same and the async routing engine is flawless.
056     */
057    public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
058        protected final Logger log = LoggerFactory.getLogger(getClass());
059        protected ProducerCache producerCache;
060        protected boolean ignoreInvalidEndpoints;
061        protected String header;
062        protected Expression expression;
063        protected String uriDelimiter;
064        protected final CamelContext camelContext;
065    
066        /**
067         * The iterator to be used for retrieving the next routing slip(s) to be used.
068         */
069        protected interface RoutingSlipIterator {
070    
071            /**
072             * Are the more routing slip(s)?
073             *
074             * @param exchange the current exchange
075             * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise.
076             */
077            boolean hasNext(Exchange exchange);
078    
079            /**
080             * Returns the next routing slip(s).
081             *
082             * @param exchange the current exchange
083             * @return the slip(s).
084             */
085            Object next(Exchange exchange);
086    
087        }
088    
089        public RoutingSlip(CamelContext camelContext) {
090            notNull(camelContext, "camelContext");
091            this.camelContext = camelContext;
092        }
093    
094        public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) {
095            notNull(camelContext, "camelContext");
096            notNull(expression, "expression");
097            
098            this.camelContext = camelContext;
099            this.expression = expression;
100            this.uriDelimiter = uriDelimiter;
101            this.header = null;
102        }
103        
104        public void setDelimiter(String delimiter) {
105            this.uriDelimiter = delimiter;
106        }
107        
108        public boolean isIgnoreInvalidEndpoints() {
109            return ignoreInvalidEndpoints;
110        }
111        
112        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
113            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
114        }
115    
116        @Override
117        public String toString() {
118            return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
119        }
120    
121        public String getTraceLabel() {
122            return "routingSlip[" + expression + "]";
123        }
124    
125        public void process(Exchange exchange) throws Exception {
126            AsyncProcessorHelper.process(this, exchange);
127        }
128    
129        public boolean process(Exchange exchange, AsyncCallback callback) {
130            if (!isStarted()) {
131                exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
132                callback.done(true);
133                return true;
134            }
135    
136            return doRoutingSlip(exchange, callback);
137        }
138    
139        public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
140            if (routingSlip instanceof Expression) {
141                this.expression = (Expression) routingSlip;
142            } else {
143                this.expression = ExpressionBuilder.constantExpression(routingSlip);
144            }
145            return doRoutingSlip(exchange, callback);
146        }
147    
148        /**
149         * Creates the route slip iterator to be used.
150         *
151         * @param exchange the exchange
152         * @return the iterator, should never be <tt>null</tt>
153         */
154        protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception {
155            Object slip = expression.evaluate(exchange, Object.class);
156            if (exchange.getException() != null) {
157                // force any exceptions occurred during evaluation to be thrown
158                throw exchange.getException();
159            }
160    
161            final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter);
162    
163            return new RoutingSlipIterator() {
164                public boolean hasNext(Exchange exchange) {
165                    return delegate.hasNext();
166                }
167    
168                public Object next(Exchange exchange) {
169                    return delegate.next();
170                }
171            };
172        }
173    
174        private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) {
175            Exchange current = exchange;
176            RoutingSlipIterator iter;
177            try {
178                iter = createRoutingSlipIterator(exchange);
179            } catch (Exception e) {
180                exchange.setException(e);
181                callback.done(true);
182                return true;
183            }
184    
185            // ensure the slip is empty when we start
186            if (current.hasProperties()) {
187                current.setProperty(Exchange.SLIP_ENDPOINT, null);
188            }
189    
190            while (iter.hasNext(current)) {
191                Endpoint endpoint;
192                try {
193                    endpoint = resolveEndpoint(iter, exchange);
194                    // if no endpoint was resolved then try the next
195                    if (endpoint == null) {
196                        continue;
197                    }
198                } catch (Exception e) {
199                    // error resolving endpoint so we should break out
200                    current.setException(e);
201                    break;
202                }
203    
204                // prepare and process the routing slip
205                Exchange copy = prepareExchangeForRoutingSlip(current, endpoint);
206                boolean sync = processExchange(endpoint, copy, exchange, callback, iter);
207                current = copy;
208    
209                if (!sync) {
210                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
211                    // the remainder of the routing slip will be completed async
212                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
213                    return false;
214                }
215    
216                log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
217    
218                // we ignore some kind of exceptions and allow us to continue
219                if (isIgnoreInvalidEndpoints()) {
220                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
221                    if (e != null) {
222                        if (log.isDebugEnabled()) {
223                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
224                        }
225                        current.setException(null);
226                    }
227                }
228    
229                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
230                // check for error if so we should break out
231                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
232                    break;
233                }
234            }
235    
236            // logging nextExchange as it contains the exchange that might have altered the payload and since
237            // we are logging the completion if will be confusing if we log the original instead
238            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
239            log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
240    
241            // copy results back to the original exchange
242            ExchangeHelper.copyResults(exchange, current);
243    
244            callback.done(true);
245            return true;
246        }
247    
248        protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
249            Object nextRecipient = iter.next(exchange);
250            Endpoint endpoint = null;
251            try {
252                endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
253            } catch (Exception e) {
254                if (isIgnoreInvalidEndpoints()) {
255                    log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
256                } else {
257                    throw e;
258                }
259            }
260            return endpoint;
261        }
262    
263        protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
264            Exchange copy = new DefaultExchange(current);
265            // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
266            // before processing the next step in the pipeline, so we have a snapshot of the exchange
267            // just before. This snapshot is used if Camel should do redeliveries (re try) using
268            // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
269            // exchange being routed.
270            copy.setExchangeId(current.getExchangeId());
271            copyOutToIn(copy, current);
272    
273            // ensure stream caching is reset
274            MessageHelper.resetStreamCache(copy.getIn());
275    
276            return copy;
277        }
278    
279        protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
280                                          final AsyncCallback callback, final RoutingSlipIterator iter) {
281    
282            // this does the actual processing so log at trace level
283            log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
284    
285            boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() {
286                public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
287                                                 ExchangePattern exchangePattern, final AsyncCallback callback) {
288                    // set property which endpoint we send to
289                    exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
290                    exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
291    
292                    return asyncProducer.process(exchange, new AsyncCallback() {
293                        public void done(boolean doneSync) {
294                            // we only have to handle async completion of the routing slip
295                            if (doneSync) {
296                                callback.done(doneSync);
297                                return;
298                            }
299    
300                            // continue processing the routing slip asynchronously
301                            Exchange current = exchange;
302    
303                            while (iter.hasNext(current)) {
304    
305                                // we ignore some kind of exceptions and allow us to continue
306                                if (isIgnoreInvalidEndpoints()) {
307                                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
308                                    if (e != null) {
309                                        if (log.isDebugEnabled()) {
310                                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
311                                        }
312                                        current.setException(null);
313                                    }
314                                }
315    
316                                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
317                                // check for error if so we should break out
318                                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
319                                    break;
320                                }
321    
322                                Endpoint endpoint;
323                                try {
324                                    endpoint = resolveEndpoint(iter, exchange);
325                                    // if no endpoint was resolved then try the next
326                                    if (endpoint == null) {
327                                        continue;
328                                    }
329                                } catch (Exception e) {
330                                    // error resolving endpoint so we should break out
331                                    exchange.setException(e);
332                                    break;
333                                }
334    
335                                // prepare and process the routing slip
336                                Exchange copy = prepareExchangeForRoutingSlip(current, endpoint);
337                                boolean sync = processExchange(endpoint, copy, original, callback, iter);
338                                current = copy;
339    
340                                if (!sync) {
341                                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
342                                    return;
343                                }
344                            }
345    
346                            // logging nextExchange as it contains the exchange that might have altered the payload and since
347                            // we are logging the completion if will be confusing if we log the original instead
348                            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
349                            log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
350    
351                            // copy results back to the original exchange
352                            ExchangeHelper.copyResults(original, current);
353                            callback.done(false);
354                        }
355                    });
356                }
357            });
358    
359            return sync;
360        }
361    
362        protected void doStart() throws Exception {
363            if (producerCache == null) {
364                producerCache = new ProducerCache(this, camelContext);
365            }
366            ServiceHelper.startService(producerCache);
367        }
368    
369        protected void doStop() throws Exception {
370            ServiceHelper.stopService(producerCache);
371        }
372    
373        protected void doShutdown() throws Exception {
374            ServiceHelper.stopAndShutdownService(producerCache);
375        }
376    
377        /**
378         * Returns the outbound message if available. Otherwise return the inbound message.
379         */
380        private Message getResultMessage(Exchange exchange) {
381            if (exchange.hasOut()) {
382                return exchange.getOut();
383            } else {
384                // if this endpoint had no out (like a mock endpoint) just take the in
385                return exchange.getIn();
386            }
387        }
388    
389        /**
390         * Copy the outbound data in 'source' to the inbound data in 'result'.
391         */
392        private void copyOutToIn(Exchange result, Exchange source) {
393            result.setException(source.getException());
394    
395            if (source.hasOut() && source.getOut().isFault()) {
396                result.getOut().copyFrom(source.getOut());
397            }
398    
399            result.setIn(getResultMessage(source));
400    
401            result.getProperties().clear();
402            result.getProperties().putAll(source.getProperties());
403        }
404    }