001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.ExecutorService;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.AsyncProcessor;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.ExchangePattern;
027    import org.apache.camel.Message;
028    import org.apache.camel.Ordered;
029    import org.apache.camel.Predicate;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Traceable;
032    import org.apache.camel.support.ServiceSupport;
033    import org.apache.camel.support.SynchronizationAdapter;
034    import org.apache.camel.util.AsyncProcessorHelper;
035    import org.apache.camel.util.ExchangeHelper;
036    import org.apache.camel.util.ServiceHelper;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    import static org.apache.camel.util.ObjectHelper.notNull;
041    
042    /**
043     * @version 
044     */
045    public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable {
046    
047        private static final Logger LOG = LoggerFactory.getLogger(OnCompletionProcessor.class);
048        private final CamelContext camelContext;
049        private final Processor processor;
050        private final ExecutorService executorService;
051        private final boolean shutdownExecutorService;
052        private final boolean onCompleteOnly;
053        private final boolean onFailureOnly;
054        private final Predicate onWhen;
055        private final boolean useOriginalBody;
056    
057        public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
058                                     boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) {
059            notNull(camelContext, "camelContext");
060            notNull(processor, "processor");
061            this.camelContext = camelContext;
062            this.processor = processor;
063            this.executorService = executorService;
064            this.shutdownExecutorService = shutdownExecutorService;
065            this.onCompleteOnly = onCompleteOnly;
066            this.onFailureOnly = onFailureOnly;
067            this.onWhen = onWhen;
068            this.useOriginalBody = useOriginalBody;
069        }
070    
071        @Override
072        protected void doStart() throws Exception {
073            ServiceHelper.startService(processor);
074        }
075    
076        @Override
077        protected void doStop() throws Exception {
078            ServiceHelper.stopService(processor);
079        }
080    
081        @Override
082        protected void doShutdown() throws Exception {
083            ServiceHelper.stopAndShutdownService(processor);
084            if (shutdownExecutorService) {
085                getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
086            }
087        }
088    
089        public CamelContext getCamelContext() {
090            return camelContext;
091        }
092    
093        public void process(Exchange exchange) throws Exception {
094            AsyncProcessorHelper.process(this, exchange);
095        }
096    
097        public boolean process(Exchange exchange, AsyncCallback callback) {
098            if (processor != null) {
099                // register callback
100                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronization());
101            }
102    
103            callback.done(true);
104            return true;
105        }
106    
107        /**
108         * Processes the exchange by the processors
109         *
110         * @param processor the processor
111         * @param exchange the exchange
112         */
113        protected static void doProcess(Processor processor, Exchange exchange) {
114            try {
115                processor.process(exchange);
116            } catch (Exception e) {
117                exchange.setException(e);
118            }
119        }
120    
121        /**
122         * Prepares the {@link Exchange} to send as onCompletion.
123         *
124         * @param exchange the current exchange
125         * @return the exchange to be routed in onComplete
126         */
127        protected Exchange prepareExchange(Exchange exchange) {
128            Exchange answer;
129    
130            // for asynchronous routing we must use a copy as we dont want it
131            // to cause side effects of the original exchange
132            // (the original thread will run in parallel)
133            answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
134            if (answer.hasOut()) {
135                // move OUT to IN (pipes and filters)
136                answer.setIn(answer.getOut());
137                answer.setOut(null);
138            }
139            // set MEP to InOnly as this wire tap is a fire and forget
140            answer.setPattern(ExchangePattern.InOnly);
141    
142            if (useOriginalBody) {
143                LOG.trace("Using the original IN message instead of current");
144    
145                Message original = exchange.getUnitOfWork().getOriginalInMessage();
146                answer.setIn(original);
147            }
148    
149            // add a header flag to indicate its a on completion exchange
150            answer.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE);
151    
152            return answer;
153        }
154    
155        private final class OnCompletionSynchronization extends SynchronizationAdapter implements Ordered {
156    
157            public int getOrder() {
158                // we want to be last
159                return Ordered.LOWEST;
160            }
161    
162            @Override
163            public void onComplete(final Exchange exchange) {
164                if (onFailureOnly) {
165                    return;
166                }
167    
168                if (onWhen != null && !onWhen.matches(exchange)) {
169                    // predicate did not match so do not route the onComplete
170                    return;
171                }
172    
173                // must use a copy as we dont want it to cause side effects of the original exchange
174                final Exchange copy = prepareExchange(exchange);
175    
176                executorService.submit(new Callable<Exchange>() {
177                    public Exchange call() throws Exception {
178                        LOG.debug("Processing onComplete: {}", copy);
179                        doProcess(processor, copy);
180                        return copy;
181                    }
182                });
183            }
184    
185            public void onFailure(final Exchange exchange) {
186                if (onCompleteOnly) {
187                    return;
188                }
189    
190                if (onWhen != null && !onWhen.matches(exchange)) {
191                    // predicate did not match so do not route the onComplete
192                    return;
193                }
194    
195                // must use a copy as we dont want it to cause side effects of the original exchange
196                final Exchange copy = prepareExchange(exchange);
197                // must remove exception otherwise onFailure routing will fail as well
198                // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
199                copy.setException(null);
200    
201                executorService.submit(new Callable<Exchange>() {
202                    public Exchange call() throws Exception {
203                        LOG.debug("Processing onFailure: {}", copy);
204                        doProcess(processor, copy);
205                        return null;
206                    }
207                });
208            }
209    
210            @Override
211            public String toString() {
212                if (!onCompleteOnly && !onFailureOnly) {
213                    return "onCompleteOrFailure";
214                } else if (onCompleteOnly) {
215                    return "onCompleteOnly";
216                } else {
217                    return "onFailureOnly";
218                }
219            }
220        }
221    
222        @Override
223        public String toString() {
224            return "OnCompletionProcessor[" + processor + "]";
225        }
226    
227        public String getTraceLabel() {
228            return "onCompletion";
229        }
230    }