001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.Consumer;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.EndpointConfiguration;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.ExchangePattern;
029    import org.apache.camel.PollingConsumer;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Producer;
032    import org.apache.camel.util.ServiceHelper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    import static org.apache.camel.processor.PipelineHelper.continueProcessing;
037    
038    /**
039     * This is an endpoint when sending to it, is intercepted and is routed in a detour
040     *
041     * @version 
042     */
043    public class InterceptSendToEndpoint implements Endpoint {
044    
045        private static final Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpoint.class);
046    
047        private final Endpoint delegate;
048        private Producer producer;
049        private Processor detour;
050        private boolean skip;
051    
052        /**
053         * Intercepts sending to the given endpoint
054         *
055         * @param destination  the original endpoint
056         * @param skip <tt>true</tt> to skip sending after the detour to the original endpoint
057         */
058        public InterceptSendToEndpoint(final Endpoint destination, boolean skip) {
059            this.delegate = destination;
060            this.skip = skip;
061        }
062    
063        public void setDetour(Processor detour) {
064            this.detour = detour;
065        }
066    
067        public Endpoint getDelegate() {
068            return delegate;
069        }
070    
071        public String getEndpointUri() {
072            return delegate.getEndpointUri();
073        }
074    
075        public EndpointConfiguration getEndpointConfiguration() {
076            return delegate.getEndpointConfiguration();
077        }
078    
079        public String getEndpointKey() {
080            return delegate.getEndpointKey();
081        }
082    
083        public Exchange createExchange() {
084            return delegate.createExchange();
085        }
086    
087        public Exchange createExchange(ExchangePattern pattern) {
088            return delegate.createExchange(pattern);
089        }
090    
091        public Exchange createExchange(Exchange exchange) {
092            return delegate.createExchange(exchange);
093        }
094    
095        public CamelContext getCamelContext() {
096            return delegate.getCamelContext();
097        }
098    
099        public Producer createProducer() throws Exception {
100            producer = delegate.createProducer();
101            return new DefaultAsyncProducer(delegate) {
102    
103                public Endpoint getEndpoint() {
104                    return producer.getEndpoint();
105                }
106    
107                public Exchange createExchange() {
108                    return producer.createExchange();
109                }
110    
111                public Exchange createExchange(ExchangePattern pattern) {
112                    return producer.createExchange(pattern);
113                }
114    
115                public Exchange createExchange(Exchange exchange) {
116                    return producer.createExchange(exchange);
117                }
118    
119                @Override
120                public boolean process(Exchange exchange, AsyncCallback callback) {
121                    // process the detour so we do the detour routing
122                    if (LOG.isDebugEnabled()) {
123                        LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, exchange});
124                    }
125                    // add header with the real endpoint uri
126                    exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri());
127    
128                    // detour the exchange using synchronous processing
129                    try {
130                        detour.process(exchange);
131                    } catch (Exception e) {
132                        exchange.setException(e);
133                        callback.done(true);
134                        return true;
135                    }
136    
137                    // Decide whether to continue or not; similar logic to the Pipeline
138                    // check for error if so we should break out
139                    if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) {
140                        callback.done(true);
141                        return true;
142                    }
143    
144                    // determine if we should skip or not
145                    boolean shouldSkip = skip;
146    
147                    // if then interceptor had a when predicate, then we should only skip if it matched
148                    Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
149                    if (whenMatches != null) {
150                        shouldSkip = skip && whenMatches;
151                    }
152    
153                    if (!shouldSkip) {
154                        if (exchange.hasOut()) {
155                            // replace OUT with IN as detour changed something
156                            exchange.setIn(exchange.getOut());
157                            exchange.setOut(null);
158                        }
159    
160                        // route to original destination leveraging the asynchronous routing engine if possible
161                        if (producer instanceof AsyncProcessor) {
162                            AsyncProcessor async = (AsyncProcessor) producer;
163                            return async.process(exchange, callback);
164                        } else {
165                            try {
166                                producer.process(exchange);
167                            } catch (Exception e) {
168                                exchange.setException(e);
169                            }
170                            callback.done(true);
171                            return true;
172                        }
173                    } else {
174                        if (LOG.isDebugEnabled()) {
175                            LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange);
176                        }
177                        callback.done(true);
178                        return true;
179                    }
180                }
181    
182                public boolean isSingleton() {
183                    return producer.isSingleton();
184                }
185    
186                public void start() throws Exception {
187                    ServiceHelper.startService(detour);
188                    // here we also need to start the producer
189                    ServiceHelper.startService(producer);
190                }
191    
192                public void stop() throws Exception {
193                    // do not stop detour as it should only be stopped when the interceptor stops
194                    // we should stop the producer here
195                    ServiceHelper.stopService(producer);
196                }
197            };
198        }
199    
200        public Consumer createConsumer(Processor processor) throws Exception {
201            return delegate.createConsumer(processor);
202        }
203    
204        public PollingConsumer createPollingConsumer() throws Exception {
205            return delegate.createPollingConsumer();
206        }
207    
208        public void configureProperties(Map<String, Object> options) {
209            delegate.configureProperties(options);
210        }
211    
212        public void setCamelContext(CamelContext context) {
213            delegate.setCamelContext(context);
214        }
215    
216        public boolean isLenientProperties() {
217            return delegate.isLenientProperties();
218        }
219    
220        public boolean isSingleton() {
221            return delegate.isSingleton();
222        }
223    
224        public void start() throws Exception {
225            ServiceHelper.startServices(detour, delegate);
226        }
227    
228        public void stop() throws Exception {
229            ServiceHelper.stopServices(delegate, detour);
230        }
231    
232        @Override
233        public String toString() {
234            return delegate.toString();
235        }
236    }