001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Locale; 020 import java.util.concurrent.ScheduledExecutorService; 021 import java.util.concurrent.ScheduledFuture; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Consumer; 026 import org.apache.camel.spi.ScheduledPollConsumerScheduler; 027 import org.apache.camel.util.ObjectHelper; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler { 032 033 private static final Logger LOG = LoggerFactory.getLogger(DefaultScheduledPollConsumerScheduler.class); 034 private CamelContext camelContext; 035 private Consumer consumer; 036 private ScheduledExecutorService scheduledExecutorService; 037 private boolean shutdownExecutor; 038 private volatile ScheduledFuture<?> future; 039 private Runnable task; 040 041 private long initialDelay = 1000; 042 private long delay = 500; 043 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; 044 private boolean useFixedDelay = true; 045 046 public CamelContext getCamelContext() { 047 return camelContext; 048 } 049 050 public void setCamelContext(CamelContext camelContext) { 051 this.camelContext = camelContext; 052 } 053 054 public long getInitialDelay() { 055 return initialDelay; 056 } 057 058 public void setInitialDelay(long initialDelay) { 059 this.initialDelay = initialDelay; 060 } 061 062 public long getDelay() { 063 return delay; 064 } 065 066 public void setDelay(long delay) { 067 this.delay = delay; 068 } 069 070 public TimeUnit getTimeUnit() { 071 return timeUnit; 072 } 073 074 public void setTimeUnit(TimeUnit timeUnit) { 075 this.timeUnit = timeUnit; 076 } 077 078 public boolean isUseFixedDelay() { 079 return useFixedDelay; 080 } 081 082 public void setUseFixedDelay(boolean useFixedDelay) { 083 this.useFixedDelay = useFixedDelay; 084 } 085 086 public ScheduledExecutorService getScheduledExecutorService() { 087 return scheduledExecutorService; 088 } 089 090 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { 091 this.scheduledExecutorService = scheduledExecutorService; 092 } 093 094 @Override 095 public void onInit(Consumer consumer) { 096 this.consumer = consumer; 097 } 098 099 @Override 100 public void scheduleTask(Runnable task) { 101 this.task = task; 102 } 103 104 @Override 105 public void unscheduleTask() { 106 if (future != null) { 107 future.cancel(false); 108 } 109 } 110 111 @Override 112 public void startScheduler() { 113 // only schedule task if we have not already done that 114 if (future == null) { 115 if (isUseFixedDelay()) { 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", 118 new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); 119 } 120 future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit()); 121 } else { 122 if (LOG.isDebugEnabled()) { 123 LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", 124 new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); 125 } 126 future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit()); 127 } 128 } 129 } 130 131 @Override 132 public boolean isSchedulerStarted() { 133 return future != null; 134 } 135 136 @Override 137 protected void doStart() throws Exception { 138 ObjectHelper.notNull(consumer, "Consumer", this); 139 ObjectHelper.notNull(camelContext, "CamelContext", this); 140 ObjectHelper.notNull(task, "Task", this); 141 142 // if no existing executor provided, then create a new thread pool ourselves 143 if (scheduledExecutorService == null) { 144 // we only need one thread in the pool to schedule this task 145 this.scheduledExecutorService = getCamelContext().getExecutorServiceManager() 146 .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri()); 147 // and we should shutdown the thread pool when no longer needed 148 this.shutdownExecutor = true; 149 } 150 } 151 152 @Override 153 protected void doStop() throws Exception { 154 if (future != null) { 155 LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future); 156 future.cancel(false); 157 future = null; 158 } 159 } 160 161 @Override 162 protected void doShutdown() throws Exception { 163 if (shutdownExecutor && scheduledExecutorService != null) { 164 getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); 165 scheduledExecutorService = null; 166 future = null; 167 } 168 } 169 }