001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Expression;
024    import org.apache.camel.NoTypeConversionAvailableException;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Traceable;
027    import org.apache.camel.util.ExchangeHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * The processor which sends messages in a loop.
033     */
034    public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
035        private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
036    
037        private final Expression expression;
038        private final boolean copy;
039    
040        public LoopProcessor(Processor processor, Expression expression, boolean copy) {
041            super(processor);
042            this.expression = expression;
043            this.copy = copy;
044        }
045    
046        @Override
047        public boolean process(Exchange exchange, AsyncCallback callback) {
048            // use atomic integer to be able to pass reference and keep track on the values
049            AtomicInteger index = new AtomicInteger();
050            AtomicInteger count = new AtomicInteger();
051    
052            // Intermediate conversion to String is needed when direct conversion to Integer is not available
053            // but evaluation result is a textual representation of a numeric value.
054            String text = expression.evaluate(exchange, String.class);
055            try {
056                int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
057                count.set(num);
058            } catch (NoTypeConversionAvailableException e) {
059                exchange.setException(e);
060                callback.done(true);
061                return true;
062            }
063            
064            // we hold on to the original Exchange in case it's needed for copies
065            final Exchange original = exchange;
066            
067            // per-iteration exchange
068            Exchange target = exchange;
069    
070            // set the size before we start
071            exchange.setProperty(Exchange.LOOP_SIZE, count);
072    
073            // loop synchronously
074            while (index.get() < count.get()) {
075    
076                // and prepare for next iteration
077                // if (!copy) target = exchange; else copy of original
078                target = prepareExchange(exchange, index.get(), original);
079                boolean sync = process(target, callback, index, count, original);
080    
081                if (!sync) {
082                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
083                    // the remainder of the routing slip will be completed async
084                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
085                    return false;
086                }
087    
088                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
089    
090                // increment counter before next loop
091                index.getAndIncrement();
092            }
093    
094            // we are done so prepare the result
095            ExchangeHelper.copyResults(exchange, target);
096            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
097            callback.done(true);
098            return true;
099        }
100    
101        protected boolean process(final Exchange exchange, final AsyncCallback callback,
102                                  final AtomicInteger index, final AtomicInteger count,
103                                  final Exchange original) {
104    
105            // set current index as property
106            LOG.debug("LoopProcessor: iteration #{}", index.get());
107            exchange.setProperty(Exchange.LOOP_INDEX, index.get());
108            
109            boolean sync = processor.process(exchange, new AsyncCallback() {
110                public void done(boolean doneSync) {
111                    // we only have to handle async completion of the routing slip
112                    if (doneSync) {
113                        return;
114                    }
115    
116                    Exchange target = exchange;
117    
118                    // increment index as we have just processed once
119                    index.getAndIncrement();
120    
121                    // continue looping asynchronously
122                    while (index.get() < count.get()) {
123    
124                        // and prepare for next iteration
125                        target = prepareExchange(exchange, index.get(), original);
126    
127                        // process again
128                        boolean sync = process(target, callback, index, count, original);
129                        if (!sync) {
130                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
131                            // the remainder of the routing slip will be completed async
132                            // so we break out now, then the callback will be invoked which then continue routing from where we left here
133                            return;
134                        }
135    
136                        // increment counter before next loop
137                        index.getAndIncrement();
138                    }
139    
140                    // we are done so prepare the result
141                    ExchangeHelper.copyResults(exchange, target);
142                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
143                    callback.done(false);
144                }
145            });
146    
147            return sync;
148        }
149    
150        /**
151         * Prepares the exchange for the next iteration
152         *
153         * @param exchange the exchange
154         * @param index the index of the next iteration
155         * @return the exchange to use
156         */
157        protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) {
158            if (copy) {
159                // use a copy but let it reuse the same exchange id so it appear as one exchange
160                // use the original exchange rather than the looping exchange (esp. with the async routing engine)
161                return ExchangeHelper.createCopy(original, true);
162            } else {
163                ExchangeHelper.prepareOutToIn(exchange);
164                return exchange;
165            }
166        }
167    
168        public Expression getExpression() {
169            return expression;
170        }
171    
172        public String getTraceLabel() {
173            return "loop[" + expression + "]";
174        }
175    
176        @Override
177        public String toString() {
178            return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
179        }
180    }