001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.io.Closeable;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Collection;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Scanner;
027    import java.util.concurrent.ExecutorService;
028    
029    import org.apache.camel.AsyncCallback;
030    import org.apache.camel.AsyncProcessor;
031    import org.apache.camel.CamelContext;
032    import org.apache.camel.Exchange;
033    import org.apache.camel.Expression;
034    import org.apache.camel.Message;
035    import org.apache.camel.Processor;
036    import org.apache.camel.RuntimeCamelException;
037    import org.apache.camel.Traceable;
038    import org.apache.camel.processor.aggregate.AggregationStrategy;
039    import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
040    import org.apache.camel.spi.RouteContext;
041    import org.apache.camel.util.ExchangeHelper;
042    import org.apache.camel.util.IOHelper;
043    import org.apache.camel.util.ObjectHelper;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    import static org.apache.camel.util.ObjectHelper.notNull;
048    
049    /**
050     * Implements a dynamic <a
051     * href="http://camel.apache.org/splitter.html">Splitter</a> pattern
052     * where an expression is evaluated to iterate through each of the parts of a
053     * message and then each part is then send to some endpoint.
054     *
055     * @version 
056     */
057    public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
058        private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
059    
060        private final Expression expression;
061    
062        public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
063            this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false);
064        }
065    
066        public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
067                        boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
068                        boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
069            super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
070                    shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork);
071            this.expression = expression;
072            notNull(expression, "expression");
073            notNull(destination, "destination");
074        }
075    
076        @Override
077        public String toString() {
078            return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
079        }
080    
081        @Override
082        public String getTraceLabel() {
083            return "split[" + expression + "]";
084        }
085    
086        @Override
087        public boolean process(Exchange exchange, final AsyncCallback callback) {
088            final AggregationStrategy strategy = getAggregationStrategy();
089    
090            // if no custom aggregation strategy is being used then fallback to keep the original
091            // and propagate exceptions which is done by a per exchange specific aggregation strategy
092            // to ensure it supports async routing
093            if (strategy == null) {
094                UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
095                setAggregationStrategyOnExchange(exchange, original);
096            }
097    
098            return super.process(exchange, callback);
099        }
100    
101        @Override
102        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
103            Object value = expression.evaluate(exchange, Object.class);
104            if (exchange.getException() != null) {
105                // force any exceptions occurred during evaluation to be thrown
106                throw exchange.getException();
107            }
108    
109            Iterable<ProcessorExchangePair> answer;
110            if (isStreaming()) {
111                answer = createProcessorExchangePairsIterable(exchange, value);
112            } else {
113                answer = createProcessorExchangePairsList(exchange, value);
114            }
115            if (exchange.getException() != null) {
116                // force any exceptions occurred during creation of exchange paris to be thrown
117                // before returning the answer;
118                throw exchange.getException();
119            }
120    
121            return answer;
122        }
123    
124        private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
125            return new SplitterIterable(exchange, value);
126        }
127    
128        private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {
129    
130            // create a copy which we use as master to copy during splitting
131            // this avoids any side effect reflected upon the incoming exchange
132            final Object value;
133            final Iterator<?> iterator;
134            private final Exchange copy;
135            private final RouteContext routeContext;
136    
137            private SplitterIterable(Exchange exchange, Object value) {
138                this.value = value;
139                this.iterator = ObjectHelper.createIterator(value);
140                this.copy = copyExchangeNoAttachments(exchange, true);
141                this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
142            }
143    
144            @Override
145            public Iterator<ProcessorExchangePair> iterator() {
146                return new Iterator<ProcessorExchangePair>() {
147                    private int index;
148                    private boolean closed;
149    
150                    public boolean hasNext() {
151                        if (closed) {
152                            return false;
153                        }
154    
155                        boolean answer = iterator.hasNext();
156                        if (!answer) {
157                            // we are now closed
158                            closed = true;
159                            // nothing more so we need to close the expression value in case it needs to be
160                            try {
161                                close();
162                            } catch (IOException e) {
163                                throw new RuntimeCamelException("Scanner aborted because of an IOException!", e);
164                            }
165                        }
166                        return answer;
167                    }
168    
169                    public ProcessorExchangePair next() {
170                        Object part = iterator.next();
171                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
172                        // and do not share the unit of work
173                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
174                        // if we share unit of work, we need to prepare the child exchange
175                        if (isShareUnitOfWork()) {
176                            prepareSharedUnitOfWork(newExchange, copy);
177                        }
178                        if (part instanceof Message) {
179                            newExchange.setIn((Message) part);
180                        } else {
181                            Message in = newExchange.getIn();
182                            in.setBody(part);
183                        }
184                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
185                    }
186    
187                    public void remove() {
188                        throw new UnsupportedOperationException("Remove is not supported by this iterator");
189                    }
190                };
191            }
192    
193            @Override
194            public void close() throws IOException {
195                if (value instanceof Scanner) {
196                    // special for Scanner which implement the Closeable since JDK7 
197                    Scanner scanner = (Scanner) value;
198                    scanner.close();
199                    IOException ioException = scanner.ioException();
200                    if (ioException != null) {
201                        throw ioException;
202                    }
203                } else if (value instanceof Closeable) {
204                    // we should throw out the exception here   
205                    IOHelper.closeWithException((Closeable) value);
206                }
207            }
208           
209        }
210    
211        private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
212            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
213    
214            // reuse iterable and add it to the result list
215            Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
216            for (ProcessorExchangePair pair : pairs) {
217                result.add(pair);
218            }
219    
220            return result;
221        }
222    
223        @Override
224        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
225                                         Iterator<ProcessorExchangePair> it) {
226            // do not share unit of work
227            exchange.setUnitOfWork(null);
228    
229            exchange.setProperty(Exchange.SPLIT_INDEX, index);
230            if (allPairs instanceof Collection) {
231                // non streaming mode, so we know the total size already
232                exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
233            }
234            if (it.hasNext()) {
235                exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
236            } else {
237                exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
238                // streaming mode, so set total size when we are complete based on the index
239                exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
240            }
241        }
242    
243        @Override
244        protected Integer getExchangeIndex(Exchange exchange) {
245            return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
246        }
247    
248        public Expression getExpression() {
249            return expression;
250        }
251        
252        private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) {
253            Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
254            // we do not want attachments for the splitted sub-messages
255            answer.getIn().setAttachments(null);
256            // we do not want to copy the message history for splitted sub-messages
257            answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
258            return answer;
259        }
260    }