001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.FailedToCreateConsumerException;
027    import org.apache.camel.LoggingLevel;
028    import org.apache.camel.PollingConsumerPollingStrategy;
029    import org.apache.camel.Processor;
030    import org.apache.camel.SuspendableService;
031    import org.apache.camel.spi.PollingConsumerPollStrategy;
032    import org.apache.camel.spi.ScheduledPollConsumerScheduler;
033    import org.apache.camel.spi.UriParam;
034    import org.apache.camel.util.IntrospectionSupport;
035    import org.apache.camel.util.ObjectHelper;
036    import org.apache.camel.util.ServiceHelper;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * A useful base class for any consumer which is polling based
042     */
043    public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
044        private static final Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
045    
046        private ScheduledPollConsumerScheduler scheduler;
047        private ScheduledExecutorService scheduledExecutorService;
048    
049        // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
050        @UriParam
051        private boolean startScheduler = true;
052        @UriParam
053        private long initialDelay = 1000;
054        @UriParam
055        private long delay = 500;
056        @UriParam
057        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
058        @UriParam
059        private boolean useFixedDelay = true;
060        @UriParam
061        private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
062        @UriParam
063        private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
064        @UriParam
065        private boolean sendEmptyMessageWhenIdle;
066        @UriParam
067        private boolean greedy;
068        @UriParam
069        private int backoffMultiplier;
070        @UriParam
071        private int backoffIdleThreshold;
072        @UriParam
073        private int backoffErrorThreshold;
074        private Map<String, Object> schedulerProperties;
075    
076        // state during running
077        private volatile boolean polling;
078        private volatile int backoffCounter;
079        private volatile long idleCounter;
080        private volatile long errorCounter;
081    
082        public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
083            super(endpoint, processor);
084        }
085    
086        public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
087            super(endpoint, processor);
088            // we have been given an existing thread pool, so we should not manage its lifecycle
089            // so we should keep shutdownExecutor as false
090            this.scheduledExecutorService = scheduledExecutorService;
091            ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
092        }
093    
094        /**
095         * Invoked whenever we should be polled
096         */
097        public void run() {
098            // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
099            try {
100                // log starting
101                if (LoggingLevel.ERROR == runLoggingLevel) {
102                    LOG.error("Scheduled task started on:   {}", this.getEndpoint());
103                } else if (LoggingLevel.WARN == runLoggingLevel) {
104                    LOG.warn("Scheduled task started on:   {}", this.getEndpoint());
105                } else if (LoggingLevel.INFO == runLoggingLevel) {
106                    LOG.info("Scheduled task started on:   {}", this.getEndpoint());
107                } else if (LoggingLevel.DEBUG == runLoggingLevel) {
108                    LOG.debug("Scheduled task started on:   {}", this.getEndpoint());
109                } else {
110                    LOG.trace("Scheduled task started on:   {}", this.getEndpoint());
111                }
112    
113                // execute scheduled task
114                doRun();
115    
116                // log completed
117                if (LoggingLevel.ERROR == runLoggingLevel) {
118                    LOG.error("Scheduled task completed on: {}", this.getEndpoint());
119                } else if (LoggingLevel.WARN == runLoggingLevel) {
120                    LOG.warn("Scheduled task completed on: {}", this.getEndpoint());
121                } else if (LoggingLevel.INFO == runLoggingLevel) {
122                    LOG.info("Scheduled task completed on: {}", this.getEndpoint());
123                } else if (LoggingLevel.DEBUG == runLoggingLevel) {
124                    LOG.debug("Scheduled task completed on: {}", this.getEndpoint());
125                } else {
126                    LOG.trace("Scheduled task completed on: {}", this.getEndpoint());
127                }
128    
129            } catch (Error e) {
130                // must catch Error, to ensure the task is re-scheduled
131                LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e);
132            }
133        }
134    
135        private void doRun() {
136            if (isSuspended()) {
137                LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint());
138                return;
139            }
140    
141            // should we backoff if its enabled, and either the idle or error counter is > the threshold
142            if (backoffMultiplier > 0
143                    // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use
144                    && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE))
145                    || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) {
146                if (backoffCounter++ < backoffMultiplier) {
147                    // yes we should backoff
148                    if (idleCounter > 0) {
149                        LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", new Object[]{idleCounter, backoffCounter, backoffMultiplier});
150                    } else {
151                        LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", new Object[]{errorCounter, backoffCounter, backoffMultiplier});
152                    }
153                    return;
154                } else {
155                    // we are finished with backoff so reset counters
156                    idleCounter = 0;
157                    errorCounter = 0;
158                    backoffCounter = 0;
159                    LOG.trace("doRun() backoff finished, resetting counters.");
160                }
161            }
162    
163            int retryCounter = -1;
164            boolean done = false;
165            Throwable cause = null;
166            int polledMessages = 0;
167    
168            while (!done) {
169                try {
170                    cause = null;
171                    // eager assume we are done
172                    done = true;
173                    if (isPollAllowed()) {
174    
175                        if (retryCounter == -1) {
176                            LOG.trace("Starting to poll: {}", this.getEndpoint());
177                        } else {
178                            LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
179                        }
180    
181                        // mark we are polling which should also include the begin/poll/commit
182                        polling = true;
183                        try {
184                            boolean begin = pollStrategy.begin(this, getEndpoint());
185                            if (begin) {
186                                retryCounter++;
187                                polledMessages = poll();
188                                LOG.trace("Polled {} messages", polledMessages);
189    
190                                if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
191                                    // send an "empty" exchange
192                                    processEmptyMessage();
193                                }
194    
195                                pollStrategy.commit(this, getEndpoint(), polledMessages);
196    
197                                if (polledMessages > 0 && isGreedy()) {
198                                    done = false;
199                                    retryCounter = -1;
200                                    LOG.trace("Greedy polling after processing {} messages", polledMessages);
201                                }
202                            } else {
203                                LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
204                            }
205                        } finally {
206                            polling = false;
207                        }
208                    }
209    
210                    LOG.trace("Finished polling: {}", this.getEndpoint());
211                } catch (Exception e) {
212                    try {
213                        boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
214                        if (retry) {
215                            // do not set cause as we retry
216                            done = false;
217                        } else {
218                            cause = e;
219                            done = true;
220                        }
221                    } catch (Throwable t) {
222                        cause = t;
223                        done = true;
224                    }
225                } catch (Throwable t) {
226                    cause = t;
227                    done = true;
228                }
229    
230                if (cause != null && isRunAllowed()) {
231                    // let exception handler deal with the caused exception
232                    // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown
233                    try {
234                        getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint()
235                                + ". Will try again at next poll", cause);
236                    } catch (Throwable e) {
237                        LOG.warn("Error handling exception. This exception will be ignored.", e);
238                    }
239                }
240            }
241    
242            if (cause != null) {
243                idleCounter = 0;
244                errorCounter++;
245            } else {
246                idleCounter = polledMessages == 0 ? ++idleCounter : 0;
247                errorCounter = 0;
248            }
249            LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter);
250    
251            // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
252        }
253    
254        /**
255         * No messages to poll so send an empty message instead.
256         *
257         * @throws Exception is thrown if error processing the empty message.
258         */
259        protected void processEmptyMessage() throws Exception {
260            Exchange exchange = getEndpoint().createExchange();
261            log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
262            getProcessor().process(exchange);
263        }
264    
265        // Properties
266        // -------------------------------------------------------------------------
267    
268        protected boolean isPollAllowed() {
269            return isRunAllowed() && !isSuspended();
270        }
271    
272        /**
273         * Whether polling is currently in progress
274         */
275        protected boolean isPolling() {
276            return polling;
277        }
278    
279        public ScheduledPollConsumerScheduler getScheduler() {
280            return scheduler;
281        }
282    
283        /**
284         * Sets a custom scheduler to use for scheduling running this task (poll).
285         *
286         * @param scheduler the custom scheduler
287         */
288        public void setScheduler(ScheduledPollConsumerScheduler scheduler) {
289            this.scheduler = scheduler;
290        }
291    
292        public Map<String, Object> getSchedulerProperties() {
293            return schedulerProperties;
294        }
295    
296        /**
297         * Additional properties to configure on the custom scheduler.
298         */
299        public void setSchedulerProperties(Map<String, Object> schedulerProperties) {
300            this.schedulerProperties = schedulerProperties;
301        }
302    
303        public long getInitialDelay() {
304            return initialDelay;
305        }
306    
307        public void setInitialDelay(long initialDelay) {
308            this.initialDelay = initialDelay;
309        }
310    
311        public long getDelay() {
312            return delay;
313        }
314    
315        public void setDelay(long delay) {
316            this.delay = delay;
317        }
318    
319        public TimeUnit getTimeUnit() {
320            return timeUnit;
321        }
322    
323        /**
324         * Sets the time unit to use.
325         * <p/>
326         * Notice that both {@link #getDelay()} and {@link #getInitialDelay()} are using
327         * the same time unit. So if you change this value, then take into account that the
328         * default value of {@link #getInitialDelay()} is 1000. So you may to adjust this value accordingly.
329         *
330         * @param timeUnit the time unit.
331         */
332        public void setTimeUnit(TimeUnit timeUnit) {
333            this.timeUnit = timeUnit;
334        }
335    
336        public boolean isUseFixedDelay() {
337            return useFixedDelay;
338        }
339    
340        public void setUseFixedDelay(boolean useFixedDelay) {
341            this.useFixedDelay = useFixedDelay;
342        }
343    
344        public LoggingLevel getRunLoggingLevel() {
345            return runLoggingLevel;
346        }
347    
348        public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
349            this.runLoggingLevel = runLoggingLevel;
350        }
351    
352        public PollingConsumerPollStrategy getPollStrategy() {
353            return pollStrategy;
354        }
355    
356        public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
357            this.pollStrategy = pollStrategy;
358        }
359    
360        public boolean isStartScheduler() {
361            return startScheduler;
362        }
363    
364        /**
365         * Sets whether the scheduler should be started when this consumer starts.
366         * <p/>
367         * This option is default true.
368         *
369         * @param startScheduler whether to start scheduler
370         */
371        public void setStartScheduler(boolean startScheduler) {
372            this.startScheduler = startScheduler;
373        }
374    
375        public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
376            this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
377        }
378    
379        public boolean isSendEmptyMessageWhenIdle() {
380            return sendEmptyMessageWhenIdle;
381        }
382    
383        public boolean isGreedy() {
384            return greedy;
385        }
386    
387        /**
388         * If greedy then a poll is executed immediate after a previous poll that polled 1 or more messages.
389         */
390        public void setGreedy(boolean greedy) {
391            this.greedy = greedy;
392        }
393    
394        public int getBackoffCounter() {
395            return backoffCounter;
396        }
397    
398        public int getBackoffMultiplier() {
399            return backoffMultiplier;
400        }
401    
402        public void setBackoffMultiplier(int backoffMultiplier) {
403            this.backoffMultiplier = backoffMultiplier;
404        }
405    
406        public int getBackoffIdleThreshold() {
407            return backoffIdleThreshold;
408        }
409    
410        public void setBackoffIdleThreshold(int backoffIdleThreshold) {
411            this.backoffIdleThreshold = backoffIdleThreshold;
412        }
413    
414        public int getBackoffErrorThreshold() {
415            return backoffErrorThreshold;
416        }
417    
418        public void setBackoffErrorThreshold(int backoffErrorThreshold) {
419            this.backoffErrorThreshold = backoffErrorThreshold;
420        }
421    
422        public ScheduledExecutorService getScheduledExecutorService() {
423            return scheduledExecutorService;
424        }
425    
426        /**
427         * Whether the scheduler has been started.
428         * <p/>
429         * The scheduler can be started with the {@link #startScheduler()} method.
430         *
431         * @return <tt>true</tt> if started, <tt>false</tt> if not.
432         */
433        public boolean isSchedulerStarted() {
434            return scheduler.isSchedulerStarted();
435        }
436    
437        /**
438         * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool
439         * <p/>
440         * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread
441         * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool
442         * when the consumer is started/stopped etc.)
443         *
444         * @param scheduledExecutorService the custom thread pool to use
445         */
446        public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
447            this.scheduledExecutorService = scheduledExecutorService;
448        }
449    
450        // Implementation methods
451        // -------------------------------------------------------------------------
452    
453        /**
454         * The polling method which is invoked periodically to poll this consumer
455         *
456         * @return number of messages polled, will be <tt>0</tt> if no message was polled at all.
457         * @throws Exception can be thrown if an exception occurred during polling
458         */
459        protected abstract int poll() throws Exception;
460    
461        @Override
462        protected void doStart() throws Exception {
463            super.doStart();
464    
465            // validate that if backoff multiplier is in use, the threshold values is set correclty
466            if (backoffMultiplier > 0) {
467                if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) {
468                    throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier");
469                }
470                LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", new Object[]{backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()});
471            }
472    
473            if (scheduler == null) {
474                scheduler = new DefaultScheduledPollConsumerScheduler();
475            }
476            scheduler.setCamelContext(getEndpoint().getCamelContext());
477            scheduler.onInit(this);
478            scheduler.scheduleTask(this);
479    
480            // configure scheduler with options from this consumer
481            Map<String, Object> properties = new HashMap<String, Object>();
482            IntrospectionSupport.getProperties(this, properties, null);
483            IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties);
484            if (schedulerProperties != null && !schedulerProperties.isEmpty()) {
485                // need to use a copy in case the consumer is restarted so we keep the properties
486                Map<String, Object> copy = new HashMap<String, Object>(schedulerProperties);
487                IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, copy);
488                if (copy.size() > 0) {
489                    throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size()
490                            + " scheduler parameters that couldn't be set on the endpoint."
491                            + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
492                            + " Unknown parameters=[" + copy + "]");
493                }
494            }
495    
496            ObjectHelper.notNull(scheduler, "scheduler", this);
497            ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
498    
499            ServiceHelper.startService(scheduler);
500    
501            if (isStartScheduler()) {
502                startScheduler();
503            }
504        }
505    
506        /**
507         * Starts the scheduler.
508         * <p/>
509         * If the scheduler is already started, then this is a noop method call.
510         */
511        public void startScheduler() {
512            scheduler.startScheduler();
513        }
514    
515        @Override
516        protected void doStop() throws Exception {
517            ServiceHelper.stopService(scheduler);
518    
519            // clear counters
520            backoffCounter = 0;
521            idleCounter = 0;
522            errorCounter = 0;
523    
524            super.doStop();
525        }
526    
527        @Override
528        protected void doShutdown() throws Exception {
529            ServiceHelper.stopAndShutdownServices(scheduler);
530            super.doShutdown();
531        }
532    
533        @Override
534        protected void doSuspend() throws Exception {
535            // dont stop/cancel the future task since we just check in the run method
536        }
537    
538        @Override
539        public void onInit() throws Exception {
540            // make sure the scheduler is starter
541            startScheduler = true;
542        }
543    
544        @Override
545        public long beforePoll(long timeout) throws Exception {
546            LOG.trace("Before poll {}", getEndpoint());
547            // resume or start our self
548            if (!ServiceHelper.resumeService(this)) {
549                ServiceHelper.startService(this);
550            }
551    
552            // ensure at least timeout is as long as one poll delay
553            return Math.max(timeout, getDelay());
554        }
555    
556        @Override
557        public void afterPoll() throws Exception {
558            LOG.trace("After poll {}", getEndpoint());
559            // suspend or stop our self
560            if (!ServiceHelper.suspendService(this)) {
561                ServiceHelper.stopService(this);
562            }
563        }
564    
565    }