001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.RejectedExecutionException;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicInteger;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.util.ObjectHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A useful base class for any processor which provides some kind of throttling
034     * or delayed processing.
035     * <p/>
036     * This implementation will block while waiting.
037     * 
038     * @version 
039     */
040    public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
041        protected final Logger log = LoggerFactory.getLogger(getClass());
042        private final CamelContext camelContext;
043        private final ScheduledExecutorService executorService;
044        private final boolean shutdownExecutorService;
045        private boolean asyncDelayed;
046        private boolean callerRunsWhenRejected = true;
047        private final AtomicInteger delayedCount = new AtomicInteger(0);
048    
049        // TODO: Add option to cancel tasks on shutdown so we can stop fast
050    
051        private final class ProcessCall implements Runnable {
052            private final Exchange exchange;
053            private final AsyncCallback callback;
054    
055            public ProcessCall(Exchange exchange, AsyncCallback callback) {
056                this.exchange = exchange;
057                this.callback = callback;
058            }
059    
060            public void run() {
061                // we are running now so decrement the counter
062                delayedCount.decrementAndGet();
063    
064                log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
065                if (!isRunAllowed()) {
066                    exchange.setException(new RejectedExecutionException("Run is not allowed"));
067                }
068    
069                // process the exchange now that we woke up
070                DelayProcessorSupport.this.processor.process(exchange, new AsyncCallback() {
071                    @Override
072                    public void done(boolean doneSync) {
073                        log.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId());
074                        // we must done the callback from this async callback as well, to ensure callback is done correctly
075                        // must invoke done on callback with false, as that is what the original caller would
076                        // expect as we returned false in the process method
077                        callback.done(false);
078                    }
079                });
080            }
081        }
082    
083        public DelayProcessorSupport(CamelContext camelContext, Processor processor) {
084            this(camelContext, processor, null, false);
085        }
086    
087        public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) {
088            super(processor);
089            this.camelContext = camelContext;
090            this.executorService = executorService;
091            this.shutdownExecutorService = shutdownExecutorService;
092        }
093    
094        @Override
095        public boolean process(Exchange exchange, AsyncCallback callback) {
096            if (!isRunAllowed()) {
097                exchange.setException(new RejectedExecutionException("Run is not allowed"));
098                callback.done(true);
099                return true;
100            }
101    
102            // calculate delay and wait
103            long delay;
104            try {
105                delay = calculateDelay(exchange);
106                if (delay <= 0) {
107                    // no delay then continue routing
108                    log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
109                    return processor.process(exchange, callback);
110                }
111            } catch (Throwable e) {
112                exchange.setException(e);
113                callback.done(true);
114                return true;
115            }
116    
117            if (!isAsyncDelayed() || exchange.isTransacted()) {
118                // use synchronous delay (also required if using transactions)
119                try {
120                    delay(delay, exchange);
121                    // then continue routing
122                    return processor.process(exchange, callback);
123                } catch (Exception e) {
124                    // exception occurred so we are done
125                    exchange.setException(e);
126                    callback.done(true);
127                    return true;
128                }
129            } else {
130                // asynchronous delay so schedule a process call task
131                // and increment the counter (we decrement the counter when we run the ProcessCall)
132                delayedCount.incrementAndGet();
133                ProcessCall call = new ProcessCall(exchange, callback);
134                try {
135                    log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
136                            delay, exchange.getExchangeId());
137                    executorService.schedule(call, delay, TimeUnit.MILLISECONDS);
138                    // tell Camel routing engine we continue routing asynchronous
139                    return false;
140                } catch (RejectedExecutionException e) {
141                    // we were not allowed to run the ProcessCall, so need to decrement the counter here
142                    delayedCount.decrementAndGet();
143                    if (isCallerRunsWhenRejected()) {
144                        if (!isRunAllowed()) {
145                            exchange.setException(new RejectedExecutionException());
146                        } else {
147                            log.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
148                            // let caller run by processing
149                            try {
150                                delay(delay, exchange);
151                            } catch (InterruptedException ie) {
152                                exchange.setException(ie);
153                            }
154                            // then continue routing
155                            return processor.process(exchange, callback);
156                        }
157                    } else {
158                        exchange.setException(e);
159                    }
160                    // caller don't run the task so we are done
161                    callback.done(true);
162                    return true;
163                }
164            }
165        }
166    
167        public boolean isAsyncDelayed() {
168            return asyncDelayed;
169        }
170    
171        public void setAsyncDelayed(boolean asyncDelayed) {
172            this.asyncDelayed = asyncDelayed;
173        }
174    
175        public boolean isCallerRunsWhenRejected() {
176            return callerRunsWhenRejected;
177        }
178    
179        public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
180            this.callerRunsWhenRejected = callerRunsWhenRejected;
181        }
182    
183        protected abstract long calculateDelay(Exchange exchange);
184    
185        /**
186         * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit)
187         */
188        public int getDelayedCount() {
189            return delayedCount.get();
190        }
191    
192        /**
193         * Delays the given time before continuing.
194         * <p/>
195         * This implementation will block while waiting
196         * 
197         * @param delay the delay time in millis
198         * @param exchange the exchange being processed
199         */
200        protected void delay(long delay, Exchange exchange) throws InterruptedException {
201            // only run is we are started
202            if (!isRunAllowed()) {
203                return;
204            }
205    
206            if (delay < 0) {
207                return;
208            } else {
209                try {
210                    // keep track on delayer counter while we sleep
211                    delayedCount.incrementAndGet();
212                    sleep(delay);
213                } catch (InterruptedException e) {
214                    handleSleepInterruptedException(e, exchange);
215                } finally {
216                    delayedCount.decrementAndGet();
217                }
218            }
219        }
220    
221        /**
222         * Called when a sleep is interrupted; allows derived classes to handle this case differently
223         */
224        protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException {
225            if (log.isDebugEnabled()) {
226                log.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
227            }
228            Thread.currentThread().interrupt();
229            throw e;
230        }
231    
232        protected long currentSystemTime() {
233            return System.currentTimeMillis();
234        }
235    
236        private void sleep(long delay) throws InterruptedException {
237            if (delay <= 0) {
238                return;
239            }
240            log.trace("Sleeping for: {} millis", delay);
241            Thread.sleep(delay);
242        }
243    
244        @Override
245        protected void doStart() throws Exception {
246            if (isAsyncDelayed()) {
247                ObjectHelper.notNull(executorService, "executorService", this);
248            }
249            super.doStart();
250        }
251    
252        @Override
253        protected void doShutdown() throws Exception {
254            if (shutdownExecutorService && executorService != null) {
255                camelContext.getExecutorServiceManager().shutdownNow(executorService);
256            }
257            super.doShutdown();
258        }
259    }