001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Locale;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.ScheduledFuture;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Consumer;
026    import org.apache.camel.spi.ScheduledPollConsumerScheduler;
027    import org.apache.camel.util.ObjectHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler {
032    
033        private static final Logger LOG = LoggerFactory.getLogger(DefaultScheduledPollConsumerScheduler.class);
034        private CamelContext camelContext;
035        private Consumer consumer;
036        private ScheduledExecutorService scheduledExecutorService;
037        private boolean shutdownExecutor;
038        private volatile ScheduledFuture<?> future;
039        private Runnable task;
040    
041        private long initialDelay = 1000;
042        private long delay = 500;
043        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
044        private boolean useFixedDelay = true;
045    
046        public CamelContext getCamelContext() {
047            return camelContext;
048        }
049    
050        public void setCamelContext(CamelContext camelContext) {
051            this.camelContext = camelContext;
052        }
053    
054        public long getInitialDelay() {
055            return initialDelay;
056        }
057    
058        public void setInitialDelay(long initialDelay) {
059            this.initialDelay = initialDelay;
060        }
061    
062        public long getDelay() {
063            return delay;
064        }
065    
066        public void setDelay(long delay) {
067            this.delay = delay;
068        }
069    
070        public TimeUnit getTimeUnit() {
071            return timeUnit;
072        }
073    
074        public void setTimeUnit(TimeUnit timeUnit) {
075            this.timeUnit = timeUnit;
076        }
077    
078        public boolean isUseFixedDelay() {
079            return useFixedDelay;
080        }
081    
082        public void setUseFixedDelay(boolean useFixedDelay) {
083            this.useFixedDelay = useFixedDelay;
084        }
085    
086        public ScheduledExecutorService getScheduledExecutorService() {
087            return scheduledExecutorService;
088        }
089    
090        public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
091            this.scheduledExecutorService = scheduledExecutorService;
092        }
093    
094        @Override
095        public void onInit(Consumer consumer) {
096            this.consumer = consumer;
097        }
098    
099        @Override
100        public void scheduleTask(Runnable task) {
101            this.task = task;
102        }
103    
104        @Override
105        public void unscheduleTask() {
106            if (future != null) {
107                future.cancel(false);
108            }
109        }
110    
111        @Override
112        public void startScheduler() {
113            // only schedule task if we have not already done that
114            if (future == null) {
115                if (isUseFixedDelay()) {
116                    if (LOG.isDebugEnabled()) {
117                        LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
118                                new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
119                    }
120                    future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit());
121                } else {
122                    if (LOG.isDebugEnabled()) {
123                        LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}",
124                                new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
125                    }
126                    future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit());
127                }
128            }
129        }
130    
131        @Override
132        public boolean isSchedulerStarted() {
133            return future != null;
134        }
135    
136        @Override
137        protected void doStart() throws Exception {
138            ObjectHelper.notNull(consumer, "Consumer", this);
139            ObjectHelper.notNull(camelContext, "CamelContext", this);
140            ObjectHelper.notNull(task, "Task", this);
141    
142            // if no existing executor provided, then create a new thread pool ourselves
143            if (scheduledExecutorService == null) {
144                // we only need one thread in the pool to schedule this task
145                this.scheduledExecutorService = getCamelContext().getExecutorServiceManager()
146                        .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri());
147                // and we should shutdown the thread pool when no longer needed
148                this.shutdownExecutor = true;
149            }
150        }
151    
152        @Override
153        protected void doStop() throws Exception {
154            if (future != null) {
155                LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future);
156                future.cancel(false);
157                future = null;
158            }
159        }
160    
161        @Override
162        protected void doShutdown() throws Exception {
163            if (shutdownExecutor && scheduledExecutorService != null) {
164                getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
165                scheduledExecutorService = null;
166                future = null;
167            }
168        }
169    }