001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.atomic.AtomicInteger; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Expression; 024 import org.apache.camel.NoTypeConversionAvailableException; 025 import org.apache.camel.Processor; 026 import org.apache.camel.Traceable; 027 import org.apache.camel.util.ExchangeHelper; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * The processor which sends messages in a loop. 033 */ 034 public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { 035 private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class); 036 037 private final Expression expression; 038 private final boolean copy; 039 040 public LoopProcessor(Processor processor, Expression expression, boolean copy) { 041 super(processor); 042 this.expression = expression; 043 this.copy = copy; 044 } 045 046 @Override 047 public boolean process(Exchange exchange, AsyncCallback callback) { 048 // use atomic integer to be able to pass reference and keep track on the values 049 AtomicInteger index = new AtomicInteger(); 050 AtomicInteger count = new AtomicInteger(); 051 052 // Intermediate conversion to String is needed when direct conversion to Integer is not available 053 // but evaluation result is a textual representation of a numeric value. 054 String text = expression.evaluate(exchange, String.class); 055 try { 056 int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text); 057 count.set(num); 058 } catch (NoTypeConversionAvailableException e) { 059 exchange.setException(e); 060 callback.done(true); 061 return true; 062 } 063 064 // we hold on to the original Exchange in case it's needed for copies 065 final Exchange original = exchange; 066 067 // per-iteration exchange 068 Exchange target = exchange; 069 070 // set the size before we start 071 exchange.setProperty(Exchange.LOOP_SIZE, count); 072 073 // loop synchronously 074 while (index.get() < count.get()) { 075 076 // and prepare for next iteration 077 // if (!copy) target = exchange; else copy of original 078 target = prepareExchange(exchange, index.get(), original); 079 boolean sync = process(target, callback, index, count, original); 080 081 if (!sync) { 082 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 083 // the remainder of the routing slip will be completed async 084 // so we break out now, then the callback will be invoked which then continue routing from where we left here 085 return false; 086 } 087 088 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId()); 089 090 // increment counter before next loop 091 index.getAndIncrement(); 092 } 093 094 // we are done so prepare the result 095 ExchangeHelper.copyResults(exchange, target); 096 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 097 callback.done(true); 098 return true; 099 } 100 101 protected boolean process(final Exchange exchange, final AsyncCallback callback, 102 final AtomicInteger index, final AtomicInteger count, 103 final Exchange original) { 104 105 // set current index as property 106 LOG.debug("LoopProcessor: iteration #{}", index.get()); 107 exchange.setProperty(Exchange.LOOP_INDEX, index.get()); 108 109 boolean sync = processor.process(exchange, new AsyncCallback() { 110 public void done(boolean doneSync) { 111 // we only have to handle async completion of the routing slip 112 if (doneSync) { 113 return; 114 } 115 116 Exchange target = exchange; 117 118 // increment index as we have just processed once 119 index.getAndIncrement(); 120 121 // continue looping asynchronously 122 while (index.get() < count.get()) { 123 124 // and prepare for next iteration 125 target = prepareExchange(exchange, index.get(), original); 126 127 // process again 128 boolean sync = process(target, callback, index, count, original); 129 if (!sync) { 130 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 131 // the remainder of the routing slip will be completed async 132 // so we break out now, then the callback will be invoked which then continue routing from where we left here 133 return; 134 } 135 136 // increment counter before next loop 137 index.getAndIncrement(); 138 } 139 140 // we are done so prepare the result 141 ExchangeHelper.copyResults(exchange, target); 142 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 143 callback.done(false); 144 } 145 }); 146 147 return sync; 148 } 149 150 /** 151 * Prepares the exchange for the next iteration 152 * 153 * @param exchange the exchange 154 * @param index the index of the next iteration 155 * @return the exchange to use 156 */ 157 protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) { 158 if (copy) { 159 // use a copy but let it reuse the same exchange id so it appear as one exchange 160 // use the original exchange rather than the looping exchange (esp. with the async routing engine) 161 return ExchangeHelper.createCopy(original, true); 162 } else { 163 ExchangeHelper.prepareOutToIn(exchange); 164 return exchange; 165 } 166 } 167 168 public Expression getExpression() { 169 return expression; 170 } 171 172 public String getTraceLabel() { 173 return "loop[" + expression + "]"; 174 } 175 176 @Override 177 public String toString() { 178 return "Loop[for: " + expression + " times do: " + getProcessor() + "]"; 179 } 180 }