001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Rejectable;
028    import org.apache.camel.ThreadPoolRejectedPolicy;
029    import org.apache.camel.support.ServiceSupport;
030    import org.apache.camel.util.AsyncProcessorHelper;
031    import org.apache.camel.util.ObjectHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s
037     * using the asynchronous routing engine.
038     * <p/>
039     * <b>Notice:</b> For transacted routes then this {@link ThreadsProcessor} is not in use, as we want to
040     * process messages using the same thread to support all work done in the same transaction. The reason
041     * is that the transaction manager that orchestrate the transaction, requires all the work to be done
042     * on the same thread.
043     * <p/>
044     * Pay attention to how this processor handles rejected tasks.
045     * <ul>
046     * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception,
047     * and marked to stop continue routing.
048     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li>
049     * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set).
050     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li>
051     * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set).
052     * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.
053     * And the current exchange will be added to the task queue.</li>
054     * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread
055     * will not be free to process a new exchange, as its processing the current exchange.</li>
056     * </ul>
057     */
058    public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor {
059    
060        private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class);
061        private final CamelContext camelContext;
062        private final ExecutorService executorService;
063        private volatile boolean shutdownExecutorService;
064        private final AtomicBoolean shutdown = new AtomicBoolean(true);
065        private boolean callerRunsWhenRejected = true;
066        private ThreadPoolRejectedPolicy rejectedPolicy;
067    
068        private final class ProcessCall implements Runnable, Rejectable {
069            private final Exchange exchange;
070            private final AsyncCallback callback;
071    
072            public ProcessCall(Exchange exchange, AsyncCallback callback) {
073                this.exchange = exchange;
074                this.callback = callback;
075            }
076    
077            @Override
078            public void run() {
079                LOG.trace("Continue routing exchange {} ", exchange);
080                if (shutdown.get()) {
081                    exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
082                }
083                callback.done(false);
084            }
085    
086            @Override
087            public void reject() {
088                // abort should mark the exchange with an rejected exception
089                boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
090                if (abort) {
091                    exchange.setException(new RejectedExecutionException());
092                }
093    
094                LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
095                // we should not continue routing, and no redelivery should be performed
096                exchange.setProperty(Exchange.ROUTE_STOP, true);
097                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
098    
099                if (shutdown.get()) {
100                    exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
101                }
102                callback.done(false);
103            }
104    
105            @Override
106            public String toString() {
107                return "ProcessCall[" + exchange + "]";
108            }
109        }
110    
111        public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
112            ObjectHelper.notNull(camelContext, "camelContext");
113            ObjectHelper.notNull(executorService, "executorService");
114            this.camelContext = camelContext;
115            this.executorService = executorService;
116            this.shutdownExecutorService = shutdownExecutorService;
117        }
118    
119        public void process(final Exchange exchange) throws Exception {
120            AsyncProcessorHelper.process(this, exchange);
121        }
122    
123        public boolean process(Exchange exchange, AsyncCallback callback) {
124            if (shutdown.get()) {
125                throw new IllegalStateException("ThreadsProcessor is not running.");
126            }
127    
128            // we cannot execute this asynchronously for transacted exchanges, as the transaction manager doesn't support
129            // using different threads in the same transaction
130            if (exchange.isTransacted()) {
131                LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
132                callback.done(true);
133                return true;
134            }
135    
136            ProcessCall call = new ProcessCall(exchange, callback);
137            try {
138                LOG.trace("Submitting task {}", call);
139                executorService.submit(call);
140                // tell Camel routing engine we continue routing asynchronous
141                return false;
142            } catch (RejectedExecutionException e) {
143                boolean callerRuns = isCallerRunsWhenRejected();
144                if (!callerRuns) {
145                    exchange.setException(e);
146                }
147    
148                LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
149                if (shutdown.get()) {
150                    exchange.setException(new RejectedExecutionException());
151                }
152                callback.done(true);
153                return true;
154            }
155        }
156    
157        public boolean isCallerRunsWhenRejected() {
158            return callerRunsWhenRejected;
159        }
160    
161        public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
162            this.callerRunsWhenRejected = callerRunsWhenRejected;
163        }
164    
165        public ThreadPoolRejectedPolicy getRejectedPolicy() {
166            return rejectedPolicy;
167        }
168    
169        public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
170            this.rejectedPolicy = rejectedPolicy;
171        }
172    
173        public String toString() {
174            return "Threads";
175        }
176    
177        protected void doStart() throws Exception {
178            shutdown.set(false);
179        }
180    
181        protected void doStop() throws Exception {
182            shutdown.set(true);
183        }
184    
185        protected void doShutdown() throws Exception {
186            if (shutdownExecutorService) {
187                camelContext.getExecutorServiceManager().shutdownNow(executorService);
188            }
189            super.doShutdown();
190        }
191    
192    }