001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.concurrent.ExecutorService;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Producer;
029    import org.apache.camel.impl.ProducerCache;
030    import org.apache.camel.processor.aggregate.AggregationStrategy;
031    import org.apache.camel.spi.RouteContext;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.MessageHelper;
034    import org.apache.camel.util.ServiceHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Implements a dynamic <a
040     * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
041     * pattern where the list of actual endpoints to send a message exchange to are
042     * dependent on some dynamic expression.
043     * <p/>
044     * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based
045     * on recipient lists. This implementation have to handle the fact the processors is not known at design time
046     * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime
047     * lookup endpoints and create producers which should act as the processors for the multicast processors which
048     * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code
049     * more trickier.
050     *
051     * @version 
052     */
053    public class RecipientListProcessor extends MulticastProcessor {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
056        private final Iterator<Object> iter;
057        private boolean ignoreInvalidEndpoints;
058        private ProducerCache producerCache;
059    
060        /**
061         * Class that represent each step in the recipient list to do
062         * <p/>
063         * This implementation ensures the provided producer is being released back in the producer cache when
064         * its done using it.
065         */
066        static final class RecipientProcessorExchangePair implements ProcessorExchangePair {
067            private final int index;
068            private final Endpoint endpoint;
069            private final Producer producer;
070            private Processor prepared;
071            private final Exchange exchange;
072            private final ProducerCache producerCache;
073    
074            private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
075                                                   Processor prepared, Exchange exchange) {
076                this.index = index;
077                this.producerCache = producerCache;
078                this.endpoint = endpoint;
079                this.producer = producer;
080                this.prepared = prepared;
081                this.exchange = exchange;
082            }
083    
084            public int getIndex() {
085                return index;
086            }
087    
088            public Exchange getExchange() {
089                return exchange;
090            }
091    
092            public Producer getProducer() {
093                return producer;
094            }
095    
096            public Processor getProcessor() {
097                return prepared;
098            }
099    
100            public void begin() {
101                // we have already acquired and prepare the producer
102                LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange);
103                exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri());
104                // ensure stream caching is reset
105                MessageHelper.resetStreamCache(exchange.getIn());
106            }
107    
108            public void done() {
109                LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange);
110                // when we are done we should release back in pool
111                try {
112                    producerCache.releaseProducer(endpoint, producer);
113                } catch (Exception e) {
114                    if (LOG.isDebugEnabled()) {
115                        LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
116                    }
117                }
118            }
119    
120        }
121    
122        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) {
123            super(camelContext, null);
124            this.producerCache = producerCache;
125            this.iter = iter;
126        }
127    
128        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) {
129            super(camelContext, null, aggregationStrategy);
130            this.producerCache = producerCache;
131            this.iter = iter;
132        }
133    
134        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
135                                      boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
136                                      boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
137            super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
138                    streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
139            this.producerCache = producerCache;
140            this.iter = iter;
141        }
142    
143        public boolean isIgnoreInvalidEndpoints() {
144            return ignoreInvalidEndpoints;
145        }
146    
147        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
148            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
149        }
150    
151        @Override
152        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
153            // here we iterate the recipient lists and create the exchange pair for each of those
154            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
155    
156            // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
157            int index = 0;
158            while (iter.hasNext()) {
159                Object recipient = iter.next();
160                Endpoint endpoint;
161                Producer producer;
162                try {
163                    endpoint = resolveEndpoint(exchange, recipient);
164                    producer = producerCache.acquireProducer(endpoint);
165                } catch (Exception e) {
166                    if (isIgnoreInvalidEndpoints()) {
167                        if (LOG.isDebugEnabled()) {
168                            LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
169                        }
170                        continue;
171                    } else {
172                        // failure so break out
173                        throw e;
174                    }
175                }
176    
177                // then create the exchange pair
178                result.add(createProcessorExchangePair(index++, endpoint, producer, exchange));
179            }
180    
181            return result;
182        }
183    
184        /**
185         * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
186         */
187        protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) {
188            Processor prepared = producer;
189    
190            // copy exchange, and do not share the unit of work
191            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
192    
193            // if we share unit of work, we need to prepare the child exchange
194            if (isShareUnitOfWork()) {
195                prepareSharedUnitOfWork(copy, exchange);
196            }
197    
198            // set property which endpoint we send to
199            setToEndpoint(copy, prepared);
200    
201            // rework error handling to support fine grained error handling
202            RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
203            prepared = createErrorHandler(routeContext, copy, prepared);
204    
205            // invoke on prepare on the exchange if specified
206            if (onPrepare != null) {
207                try {
208                    onPrepare.process(copy);
209                } catch (Exception e) {
210                    copy.setException(e);
211                }
212            }
213    
214            // and create the pair
215            return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy);
216        }
217    
218        protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
219            // trim strings as end users might have added spaces between separators
220            if (recipient instanceof String) {
221                recipient = ((String) recipient).trim();
222            }
223            return ExchangeHelper.resolveEndpoint(exchange, recipient);
224        }
225    
226        protected void doStart() throws Exception {
227            super.doStart();
228            if (producerCache == null) {
229                producerCache = new ProducerCache(this, getCamelContext());
230            }
231            ServiceHelper.startService(producerCache);
232        }
233    
234        protected void doStop() throws Exception {
235            ServiceHelper.stopService(producerCache);
236            super.doStop();
237        }
238    
239        protected void doShutdown() throws Exception {
240            ServiceHelper.stopAndShutdownService(producerCache);
241            super.doShutdown();
242        }
243    
244        @Override
245        public String toString() {
246            return "RecipientList";
247        }
248    
249        @Override
250        public String getTraceLabel() {
251            return "recipientList";
252        }
253    }