001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.HashMap;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.Producer;
029    import org.apache.camel.ServicePoolAware;
030    import org.apache.camel.Traceable;
031    import org.apache.camel.impl.InterceptSendToEndpoint;
032    import org.apache.camel.impl.ProducerCache;
033    import org.apache.camel.support.ServiceSupport;
034    import org.apache.camel.util.AsyncProcessorConverterHelper;
035    import org.apache.camel.util.AsyncProcessorHelper;
036    import org.apache.camel.util.EventHelper;
037    import org.apache.camel.util.ObjectHelper;
038    import org.apache.camel.util.ServiceHelper;
039    import org.apache.camel.util.StopWatch;
040    import org.apache.camel.util.URISupport;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * Processor for forwarding exchanges to an endpoint destination.
046     *
047     * @version 
048     */
049    public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable {
050        protected static final Logger LOG = LoggerFactory.getLogger(SendProcessor.class);
051        protected final CamelContext camelContext;
052        protected final ExchangePattern pattern;
053        protected ProducerCache producerCache;
054        protected AsyncProcessor producer;
055        protected Endpoint destination;
056    
057        public SendProcessor(Endpoint destination) {
058            this(destination, null);
059        }
060    
061        public SendProcessor(Endpoint destination, ExchangePattern pattern) {
062            ObjectHelper.notNull(destination, "destination");
063            this.destination = destination;
064            this.camelContext = destination.getCamelContext();
065            this.pattern = pattern;
066            ObjectHelper.notNull(this.camelContext, "camelContext");
067        }
068    
069        @Override
070        public String toString() {
071            return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")";
072        }
073    
074        /**
075         * @deprecated not longer supported.
076         */
077        @Deprecated
078        public void setDestination(Endpoint destination) {
079        }
080    
081        public String getTraceLabel() {
082            return URISupport.sanitizeUri(destination.getEndpointUri());
083        }
084    
085        public void process(final Exchange exchange) throws Exception {
086            AsyncProcessorHelper.process(this, exchange);
087        }
088    
089        public boolean process(Exchange exchange, final AsyncCallback callback) {
090            if (!isStarted()) {
091                exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
092                callback.done(true);
093                return true;
094            }
095    
096            // we should preserve existing MEP so remember old MEP
097            // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
098            final ExchangePattern existingPattern = exchange.getPattern();
099    
100            // if we have a producer then use that as its optimized
101            if (producer != null) {
102    
103                // record timing for sending the exchange using the producer
104                final StopWatch watch = new StopWatch();
105    
106                final Exchange target = configureExchange(exchange, pattern);
107    
108                EventHelper.notifyExchangeSending(exchange.getContext(), target, destination);
109                LOG.debug(">>>> {} {}", destination, exchange);
110    
111                boolean sync = true;
112                try {
113                    sync = producer.process(exchange, new AsyncCallback() {
114                        @Override
115                        public void done(boolean doneSync) {
116                            try {
117                                // restore previous MEP
118                                target.setPattern(existingPattern);
119                                // emit event that the exchange was sent to the endpoint
120                                long timeTaken = watch.stop();
121                                EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken);
122                            } finally {
123                                callback.done(doneSync);
124                            }
125                        }
126                    });
127                } catch (Throwable throwable) {
128                    if (exchange != null) {
129                        exchange.setException(throwable);
130                    }
131    
132                }
133    
134                return sync;
135            }
136    
137            // send the exchange to the destination using the producer cache for the non optimized producers
138            return producerCache.doInAsyncProducer(destination, exchange, pattern, callback, new AsyncProducerCallback() {
139                public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
140                                                 ExchangePattern pattern, final AsyncCallback callback) {
141                    final Exchange target = configureExchange(exchange, pattern);
142                    LOG.debug(">>>> {} {}", destination, exchange);
143                    return asyncProducer.process(target, new AsyncCallback() {
144                        public void done(boolean doneSync) {
145                            // restore previous MEP
146                            target.setPattern(existingPattern);
147                            // signal we are done
148                            callback.done(doneSync);
149                        }
150                    });
151                }
152            });
153        }
154    
155        public Endpoint getDestination() {
156            return destination;
157        }
158    
159        public ExchangePattern getPattern() {
160            return pattern;
161        }
162    
163        protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
164            if (pattern != null) {
165                exchange.setPattern(pattern);
166            }
167            // set property which endpoint we send to
168            exchange.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
169            return exchange;
170        }
171    
172        protected void doStart() throws Exception {
173            if (producerCache == null) {
174                // use a single producer cache as we need to only hold reference for one destination
175                // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory
176                // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed,
177                // eg stopping the producer when we stop etc.
178                producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1));
179                // do not add as service as we do not want to manage the producer cache
180            }
181            ServiceHelper.startService(producerCache);
182    
183            // the destination could since have been intercepted by a interceptSendToEndpoint so we got to
184            // lookup this before we can use the destination
185            Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey());
186            if (lookup instanceof InterceptSendToEndpoint) {
187                if (LOG.isDebugEnabled()) {
188                    LOG.debug("Intercepted sending to {} -> {}",
189                            URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri()));
190                }
191                destination = lookup;
192            }
193            // warm up the producer by starting it so we can fail fast if there was a problem
194            // however must start endpoint first
195            ServiceHelper.startService(destination);
196    
197            // this SendProcessor is used a lot in Camel (eg every .to in the route DSL) and therefore we
198            // want to optimize for regular producers, by using the producer directly instead of the ProducerCache
199            // Only for pooled and non singleton producers we have to use the ProducerCache as it supports these
200            // kind of producer better (though these kind of producer should be rare)
201    
202            Producer producer = producerCache.acquireProducer(destination);
203            if (producer instanceof ServicePoolAware || !producer.isSingleton()) {
204                // no we cannot optimize it - so release the producer back to the producer cache
205                // and use the producer cache for sending
206                producerCache.releaseProducer(destination, producer);
207            } else {
208                // yes we can optimize and use the producer directly for sending
209                this.producer = AsyncProcessorConverterHelper.convert(producer);
210            }
211        }
212    
213        protected void doStop() throws Exception {
214            ServiceHelper.stopServices(producerCache, producer);
215        }
216    
217        protected void doShutdown() throws Exception {
218            ServiceHelper.stopAndShutdownServices(producerCache, producer);
219        }
220    }