001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collections;
020    import java.util.Iterator;
021    import java.util.concurrent.ExecutorService;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Endpoint;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Expression;
029    import org.apache.camel.Processor;
030    import org.apache.camel.impl.ProducerCache;
031    import org.apache.camel.processor.aggregate.AggregationStrategy;
032    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
033    import org.apache.camel.support.ServiceSupport;
034    import org.apache.camel.util.AsyncProcessorHelper;
035    import org.apache.camel.util.ExchangeHelper;
036    import org.apache.camel.util.ObjectHelper;
037    import org.apache.camel.util.ServiceHelper;
038    
039    import static org.apache.camel.util.ObjectHelper.notNull;
040    
041    /**
042     * Implements a dynamic <a
043     * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
044     * pattern where the list of actual endpoints to send a message exchange to are
045     * dependent on some dynamic expression.
046     *
047     * @version 
048     */
049    public class RecipientList extends ServiceSupport implements AsyncProcessor {
050        private static final String IGNORE_DELIMITER_MARKER = "false";
051        private final CamelContext camelContext;
052        private ProducerCache producerCache;
053        private Expression expression;
054        private final String delimiter;
055        private boolean parallelProcessing;
056        private boolean stopOnException;
057        private boolean ignoreInvalidEndpoints;
058        private boolean streaming;
059        private long timeout;
060        private Processor onPrepare;
061        private boolean shareUnitOfWork;
062        private ExecutorService executorService;
063        private boolean shutdownExecutorService;
064        private ExecutorService aggregateExecutorService;
065        private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
066    
067        public RecipientList(CamelContext camelContext) {
068            // use comma by default as delimiter
069            this(camelContext, ",");
070        }
071    
072        public RecipientList(CamelContext camelContext, String delimiter) {
073            notNull(camelContext, "camelContext");
074            ObjectHelper.notEmpty(delimiter, "delimiter");
075            this.camelContext = camelContext;
076            this.delimiter = delimiter;
077        }
078    
079        public RecipientList(CamelContext camelContext, Expression expression) {
080            // use comma by default as delimiter
081            this(camelContext, expression, ",");
082        }
083    
084        public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
085            notNull(camelContext, "camelContext");
086            ObjectHelper.notNull(expression, "expression");
087            ObjectHelper.notEmpty(delimiter, "delimiter");
088            this.camelContext = camelContext;
089            this.expression = expression;
090            this.delimiter = delimiter;
091        }
092    
093        @Override
094        public String toString() {
095            return "RecipientList[" + (expression != null ? expression : "") + "]";
096        }
097    
098        public void process(Exchange exchange) throws Exception {
099            AsyncProcessorHelper.process(this, exchange);
100        }
101    
102        public boolean process(Exchange exchange, AsyncCallback callback) {
103            if (!isStarted()) {
104                throw new IllegalStateException("RecipientList has not been started: " + this);
105            }
106    
107            // use the evaluate expression result if exists
108            Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
109            if (recipientList == null && expression != null) {
110                // fallback and evaluate the expression
111                recipientList = expression.evaluate(exchange, Object.class);
112            }
113    
114            return sendToRecipientList(exchange, recipientList, callback);
115        }
116    
117        /**
118         * Sends the given exchange to the recipient list
119         */
120        public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
121            Iterator<Object> iter;
122    
123            if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
124                iter = ObjectHelper.createIterator(recipientList, null);
125            } else {
126                iter = ObjectHelper.createIterator(recipientList, delimiter);
127            }
128    
129            RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
130                    isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
131                    isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
132                @Override
133                protected synchronized ExecutorService createAggregateExecutorService(String name) {
134                    // use a shared executor service to avoid creating new thread pools
135                    if (aggregateExecutorService == null) {
136                        aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask");
137                    }
138                    return aggregateExecutorService;
139                }
140            };
141            rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
142    
143            // start the service
144            try {
145                ServiceHelper.startService(rlp);
146            } catch (Exception e) {
147                exchange.setException(e);
148                callback.done(true);
149                return true;
150            }
151    
152            AsyncProcessor target = rlp;
153            if (isShareUnitOfWork()) {
154                // wrap answer in a sub unit of work, since we share the unit of work
155                CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp);
156                internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice());
157                target = internalProcessor;
158            }
159    
160            // now let the multicast process the exchange
161            return target.process(exchange, callback);
162        }
163    
164        protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
165            // trim strings as end users might have added spaces between separators
166            if (recipient instanceof String) {
167                recipient = ((String)recipient).trim();
168            }
169            return ExchangeHelper.resolveEndpoint(exchange, recipient);
170        }
171    
172        protected void doStart() throws Exception {
173            if (producerCache == null) {
174                producerCache = new ProducerCache(this, camelContext);
175            }
176            ServiceHelper.startServices(aggregationStrategy, producerCache);
177        }
178    
179        protected void doStop() throws Exception {
180            ServiceHelper.stopServices(producerCache, aggregationStrategy);
181        }
182    
183        protected void doShutdown() throws Exception {
184            ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
185    
186            if (shutdownExecutorService && executorService != null) {
187                camelContext.getExecutorServiceManager().shutdownNow(executorService);
188            }
189        }
190    
191        public boolean isStreaming() {
192            return streaming;
193        }
194        
195        public void setStreaming(boolean streaming) {
196            this.streaming = streaming;
197        }
198     
199        public boolean isIgnoreInvalidEndpoints() {
200            return ignoreInvalidEndpoints;
201        }
202        
203        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
204            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
205        }
206    
207        public boolean isParallelProcessing() {
208            return parallelProcessing;
209        }
210    
211        public void setParallelProcessing(boolean parallelProcessing) {
212            this.parallelProcessing = parallelProcessing;
213        }
214    
215        public boolean isStopOnException() {
216            return stopOnException;
217        }
218    
219        public void setStopOnException(boolean stopOnException) {
220            this.stopOnException = stopOnException;
221        }
222    
223        public ExecutorService getExecutorService() {
224            return executorService;
225        }
226    
227        public void setExecutorService(ExecutorService executorService) {
228            this.executorService = executorService;
229        }
230    
231        public boolean isShutdownExecutorService() {
232            return shutdownExecutorService;
233        }
234    
235        public void setShutdownExecutorService(boolean shutdownExecutorService) {
236            this.shutdownExecutorService = shutdownExecutorService;
237        }
238    
239        public AggregationStrategy getAggregationStrategy() {
240            return aggregationStrategy;
241        }
242    
243        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
244            this.aggregationStrategy = aggregationStrategy;
245        }
246    
247        public long getTimeout() {
248            return timeout;
249        }
250    
251        public void setTimeout(long timeout) {
252            this.timeout = timeout;
253        }
254    
255        public Processor getOnPrepare() {
256            return onPrepare;
257        }
258    
259        public void setOnPrepare(Processor onPrepare) {
260            this.onPrepare = onPrepare;
261        }
262    
263        public boolean isShareUnitOfWork() {
264            return shareUnitOfWork;
265        }
266    
267        public void setShareUnitOfWork(boolean shareUnitOfWork) {
268            this.shareUnitOfWork = shareUnitOfWork;
269        }
270    }