001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.io.Closeable; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.Collections; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.Scanner; 027 import java.util.concurrent.ExecutorService; 028 029 import org.apache.camel.AsyncCallback; 030 import org.apache.camel.AsyncProcessor; 031 import org.apache.camel.CamelContext; 032 import org.apache.camel.Exchange; 033 import org.apache.camel.Expression; 034 import org.apache.camel.Message; 035 import org.apache.camel.Processor; 036 import org.apache.camel.RuntimeCamelException; 037 import org.apache.camel.Traceable; 038 import org.apache.camel.processor.aggregate.AggregationStrategy; 039 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; 040 import org.apache.camel.spi.RouteContext; 041 import org.apache.camel.util.ExchangeHelper; 042 import org.apache.camel.util.IOHelper; 043 import org.apache.camel.util.ObjectHelper; 044 import org.slf4j.Logger; 045 import org.slf4j.LoggerFactory; 046 047 import static org.apache.camel.util.ObjectHelper.notNull; 048 049 /** 050 * Implements a dynamic <a 051 * href="http://camel.apache.org/splitter.html">Splitter</a> pattern 052 * where an expression is evaluated to iterate through each of the parts of a 053 * message and then each part is then send to some endpoint. 054 * 055 * @version 056 */ 057 public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable { 058 private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); 059 060 private final Expression expression; 061 062 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) { 063 this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false); 064 } 065 066 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, 067 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 068 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) { 069 super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, 070 shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork); 071 this.expression = expression; 072 notNull(expression, "expression"); 073 notNull(destination, "destination"); 074 } 075 076 @Override 077 public String toString() { 078 return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]"; 079 } 080 081 @Override 082 public String getTraceLabel() { 083 return "split[" + expression + "]"; 084 } 085 086 @Override 087 public boolean process(Exchange exchange, final AsyncCallback callback) { 088 final AggregationStrategy strategy = getAggregationStrategy(); 089 090 // if no custom aggregation strategy is being used then fallback to keep the original 091 // and propagate exceptions which is done by a per exchange specific aggregation strategy 092 // to ensure it supports async routing 093 if (strategy == null) { 094 UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); 095 setAggregationStrategyOnExchange(exchange, original); 096 } 097 098 return super.process(exchange, callback); 099 } 100 101 @Override 102 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 103 Object value = expression.evaluate(exchange, Object.class); 104 if (exchange.getException() != null) { 105 // force any exceptions occurred during evaluation to be thrown 106 throw exchange.getException(); 107 } 108 109 Iterable<ProcessorExchangePair> answer; 110 if (isStreaming()) { 111 answer = createProcessorExchangePairsIterable(exchange, value); 112 } else { 113 answer = createProcessorExchangePairsList(exchange, value); 114 } 115 if (exchange.getException() != null) { 116 // force any exceptions occurred during creation of exchange paris to be thrown 117 // before returning the answer; 118 throw exchange.getException(); 119 } 120 121 return answer; 122 } 123 124 private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) { 125 return new SplitterIterable(exchange, value); 126 } 127 128 private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable { 129 130 // create a copy which we use as master to copy during splitting 131 // this avoids any side effect reflected upon the incoming exchange 132 final Object value; 133 final Iterator<?> iterator; 134 private final Exchange copy; 135 private final RouteContext routeContext; 136 137 private SplitterIterable(Exchange exchange, Object value) { 138 this.value = value; 139 this.iterator = ObjectHelper.createIterator(value); 140 this.copy = copyExchangeNoAttachments(exchange, true); 141 this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 142 } 143 144 @Override 145 public Iterator<ProcessorExchangePair> iterator() { 146 return new Iterator<ProcessorExchangePair>() { 147 private int index; 148 private boolean closed; 149 150 public boolean hasNext() { 151 if (closed) { 152 return false; 153 } 154 155 boolean answer = iterator.hasNext(); 156 if (!answer) { 157 // we are now closed 158 closed = true; 159 // nothing more so we need to close the expression value in case it needs to be 160 try { 161 close(); 162 } catch (IOException e) { 163 throw new RuntimeCamelException("Scanner aborted because of an IOException!", e); 164 } 165 } 166 return answer; 167 } 168 169 public ProcessorExchangePair next() { 170 Object part = iterator.next(); 171 // create a correlated copy as the new exchange to be routed in the splitter from the copy 172 // and do not share the unit of work 173 Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); 174 // if we share unit of work, we need to prepare the child exchange 175 if (isShareUnitOfWork()) { 176 prepareSharedUnitOfWork(newExchange, copy); 177 } 178 if (part instanceof Message) { 179 newExchange.setIn((Message) part); 180 } else { 181 Message in = newExchange.getIn(); 182 in.setBody(part); 183 } 184 return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); 185 } 186 187 public void remove() { 188 throw new UnsupportedOperationException("Remove is not supported by this iterator"); 189 } 190 }; 191 } 192 193 @Override 194 public void close() throws IOException { 195 if (value instanceof Scanner) { 196 // special for Scanner which implement the Closeable since JDK7 197 Scanner scanner = (Scanner) value; 198 scanner.close(); 199 IOException ioException = scanner.ioException(); 200 if (ioException != null) { 201 throw ioException; 202 } 203 } else if (value instanceof Closeable) { 204 // we should throw out the exception here 205 IOHelper.closeWithException((Closeable) value); 206 } 207 } 208 209 } 210 211 private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) { 212 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 213 214 // reuse iterable and add it to the result list 215 Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value); 216 for (ProcessorExchangePair pair : pairs) { 217 result.add(pair); 218 } 219 220 return result; 221 } 222 223 @Override 224 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 225 Iterator<ProcessorExchangePair> it) { 226 // do not share unit of work 227 exchange.setUnitOfWork(null); 228 229 exchange.setProperty(Exchange.SPLIT_INDEX, index); 230 if (allPairs instanceof Collection) { 231 // non streaming mode, so we know the total size already 232 exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size()); 233 } 234 if (it.hasNext()) { 235 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE); 236 } else { 237 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE); 238 // streaming mode, so set total size when we are complete based on the index 239 exchange.setProperty(Exchange.SPLIT_SIZE, index + 1); 240 } 241 } 242 243 @Override 244 protected Integer getExchangeIndex(Exchange exchange) { 245 return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class); 246 } 247 248 public Expression getExpression() { 249 return expression; 250 } 251 252 private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) { 253 Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId); 254 // we do not want attachments for the splitted sub-messages 255 answer.getIn().setAttachments(null); 256 // we do not want to copy the message history for splitted sub-messages 257 answer.getProperties().remove(Exchange.MESSAGE_HISTORY); 258 return answer; 259 } 260 }