001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.ScheduledExecutorService; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.FailedToCreateConsumerException; 027 import org.apache.camel.LoggingLevel; 028 import org.apache.camel.PollingConsumerPollingStrategy; 029 import org.apache.camel.Processor; 030 import org.apache.camel.SuspendableService; 031 import org.apache.camel.spi.PollingConsumerPollStrategy; 032 import org.apache.camel.spi.ScheduledPollConsumerScheduler; 033 import org.apache.camel.spi.UriParam; 034 import org.apache.camel.util.IntrospectionSupport; 035 import org.apache.camel.util.ObjectHelper; 036 import org.apache.camel.util.ServiceHelper; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * A useful base class for any consumer which is polling based 042 */ 043 public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { 044 private static final Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); 045 046 private ScheduledPollConsumerScheduler scheduler; 047 private ScheduledExecutorService scheduledExecutorService; 048 049 // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties 050 @UriParam 051 private boolean startScheduler = true; 052 @UriParam 053 private long initialDelay = 1000; 054 @UriParam 055 private long delay = 500; 056 @UriParam 057 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; 058 @UriParam 059 private boolean useFixedDelay = true; 060 @UriParam 061 private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); 062 @UriParam 063 private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; 064 @UriParam 065 private boolean sendEmptyMessageWhenIdle; 066 @UriParam 067 private boolean greedy; 068 @UriParam 069 private int backoffMultiplier; 070 @UriParam 071 private int backoffIdleThreshold; 072 @UriParam 073 private int backoffErrorThreshold; 074 private Map<String, Object> schedulerProperties; 075 076 // state during running 077 private volatile boolean polling; 078 private volatile int backoffCounter; 079 private volatile long idleCounter; 080 private volatile long errorCounter; 081 082 public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { 083 super(endpoint, processor); 084 } 085 086 public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) { 087 super(endpoint, processor); 088 // we have been given an existing thread pool, so we should not manage its lifecycle 089 // so we should keep shutdownExecutor as false 090 this.scheduledExecutorService = scheduledExecutorService; 091 ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService"); 092 } 093 094 /** 095 * Invoked whenever we should be polled 096 */ 097 public void run() { 098 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 099 try { 100 // log starting 101 if (LoggingLevel.ERROR == runLoggingLevel) { 102 LOG.error("Scheduled task started on: {}", this.getEndpoint()); 103 } else if (LoggingLevel.WARN == runLoggingLevel) { 104 LOG.warn("Scheduled task started on: {}", this.getEndpoint()); 105 } else if (LoggingLevel.INFO == runLoggingLevel) { 106 LOG.info("Scheduled task started on: {}", this.getEndpoint()); 107 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 108 LOG.debug("Scheduled task started on: {}", this.getEndpoint()); 109 } else { 110 LOG.trace("Scheduled task started on: {}", this.getEndpoint()); 111 } 112 113 // execute scheduled task 114 doRun(); 115 116 // log completed 117 if (LoggingLevel.ERROR == runLoggingLevel) { 118 LOG.error("Scheduled task completed on: {}", this.getEndpoint()); 119 } else if (LoggingLevel.WARN == runLoggingLevel) { 120 LOG.warn("Scheduled task completed on: {}", this.getEndpoint()); 121 } else if (LoggingLevel.INFO == runLoggingLevel) { 122 LOG.info("Scheduled task completed on: {}", this.getEndpoint()); 123 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 124 LOG.debug("Scheduled task completed on: {}", this.getEndpoint()); 125 } else { 126 LOG.trace("Scheduled task completed on: {}", this.getEndpoint()); 127 } 128 129 } catch (Error e) { 130 // must catch Error, to ensure the task is re-scheduled 131 LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e); 132 } 133 } 134 135 private void doRun() { 136 if (isSuspended()) { 137 LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint()); 138 return; 139 } 140 141 // should we backoff if its enabled, and either the idle or error counter is > the threshold 142 if (backoffMultiplier > 0 143 // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use 144 && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE)) 145 || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { 146 if (backoffCounter++ < backoffMultiplier) { 147 // yes we should backoff 148 if (idleCounter > 0) { 149 LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", new Object[]{idleCounter, backoffCounter, backoffMultiplier}); 150 } else { 151 LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", new Object[]{errorCounter, backoffCounter, backoffMultiplier}); 152 } 153 return; 154 } else { 155 // we are finished with backoff so reset counters 156 idleCounter = 0; 157 errorCounter = 0; 158 backoffCounter = 0; 159 LOG.trace("doRun() backoff finished, resetting counters."); 160 } 161 } 162 163 int retryCounter = -1; 164 boolean done = false; 165 Throwable cause = null; 166 int polledMessages = 0; 167 168 while (!done) { 169 try { 170 cause = null; 171 // eager assume we are done 172 done = true; 173 if (isPollAllowed()) { 174 175 if (retryCounter == -1) { 176 LOG.trace("Starting to poll: {}", this.getEndpoint()); 177 } else { 178 LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); 179 } 180 181 // mark we are polling which should also include the begin/poll/commit 182 polling = true; 183 try { 184 boolean begin = pollStrategy.begin(this, getEndpoint()); 185 if (begin) { 186 retryCounter++; 187 polledMessages = poll(); 188 LOG.trace("Polled {} messages", polledMessages); 189 190 if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { 191 // send an "empty" exchange 192 processEmptyMessage(); 193 } 194 195 pollStrategy.commit(this, getEndpoint(), polledMessages); 196 197 if (polledMessages > 0 && isGreedy()) { 198 done = false; 199 retryCounter = -1; 200 LOG.trace("Greedy polling after processing {} messages", polledMessages); 201 } 202 } else { 203 LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); 204 } 205 } finally { 206 polling = false; 207 } 208 } 209 210 LOG.trace("Finished polling: {}", this.getEndpoint()); 211 } catch (Exception e) { 212 try { 213 boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e); 214 if (retry) { 215 // do not set cause as we retry 216 done = false; 217 } else { 218 cause = e; 219 done = true; 220 } 221 } catch (Throwable t) { 222 cause = t; 223 done = true; 224 } 225 } catch (Throwable t) { 226 cause = t; 227 done = true; 228 } 229 230 if (cause != null && isRunAllowed()) { 231 // let exception handler deal with the caused exception 232 // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown 233 try { 234 getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() 235 + ". Will try again at next poll", cause); 236 } catch (Throwable e) { 237 LOG.warn("Error handling exception. This exception will be ignored.", e); 238 } 239 } 240 } 241 242 if (cause != null) { 243 idleCounter = 0; 244 errorCounter++; 245 } else { 246 idleCounter = polledMessages == 0 ? ++idleCounter : 0; 247 errorCounter = 0; 248 } 249 LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter); 250 251 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 252 } 253 254 /** 255 * No messages to poll so send an empty message instead. 256 * 257 * @throws Exception is thrown if error processing the empty message. 258 */ 259 protected void processEmptyMessage() throws Exception { 260 Exchange exchange = getEndpoint().createExchange(); 261 log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint()); 262 getProcessor().process(exchange); 263 } 264 265 // Properties 266 // ------------------------------------------------------------------------- 267 268 protected boolean isPollAllowed() { 269 return isRunAllowed() && !isSuspended(); 270 } 271 272 /** 273 * Whether polling is currently in progress 274 */ 275 protected boolean isPolling() { 276 return polling; 277 } 278 279 public ScheduledPollConsumerScheduler getScheduler() { 280 return scheduler; 281 } 282 283 /** 284 * Sets a custom scheduler to use for scheduling running this task (poll). 285 * 286 * @param scheduler the custom scheduler 287 */ 288 public void setScheduler(ScheduledPollConsumerScheduler scheduler) { 289 this.scheduler = scheduler; 290 } 291 292 public Map<String, Object> getSchedulerProperties() { 293 return schedulerProperties; 294 } 295 296 /** 297 * Additional properties to configure on the custom scheduler. 298 */ 299 public void setSchedulerProperties(Map<String, Object> schedulerProperties) { 300 this.schedulerProperties = schedulerProperties; 301 } 302 303 public long getInitialDelay() { 304 return initialDelay; 305 } 306 307 public void setInitialDelay(long initialDelay) { 308 this.initialDelay = initialDelay; 309 } 310 311 public long getDelay() { 312 return delay; 313 } 314 315 public void setDelay(long delay) { 316 this.delay = delay; 317 } 318 319 public TimeUnit getTimeUnit() { 320 return timeUnit; 321 } 322 323 /** 324 * Sets the time unit to use. 325 * <p/> 326 * Notice that both {@link #getDelay()} and {@link #getInitialDelay()} are using 327 * the same time unit. So if you change this value, then take into account that the 328 * default value of {@link #getInitialDelay()} is 1000. So you may to adjust this value accordingly. 329 * 330 * @param timeUnit the time unit. 331 */ 332 public void setTimeUnit(TimeUnit timeUnit) { 333 this.timeUnit = timeUnit; 334 } 335 336 public boolean isUseFixedDelay() { 337 return useFixedDelay; 338 } 339 340 public void setUseFixedDelay(boolean useFixedDelay) { 341 this.useFixedDelay = useFixedDelay; 342 } 343 344 public LoggingLevel getRunLoggingLevel() { 345 return runLoggingLevel; 346 } 347 348 public void setRunLoggingLevel(LoggingLevel runLoggingLevel) { 349 this.runLoggingLevel = runLoggingLevel; 350 } 351 352 public PollingConsumerPollStrategy getPollStrategy() { 353 return pollStrategy; 354 } 355 356 public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) { 357 this.pollStrategy = pollStrategy; 358 } 359 360 public boolean isStartScheduler() { 361 return startScheduler; 362 } 363 364 /** 365 * Sets whether the scheduler should be started when this consumer starts. 366 * <p/> 367 * This option is default true. 368 * 369 * @param startScheduler whether to start scheduler 370 */ 371 public void setStartScheduler(boolean startScheduler) { 372 this.startScheduler = startScheduler; 373 } 374 375 public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { 376 this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; 377 } 378 379 public boolean isSendEmptyMessageWhenIdle() { 380 return sendEmptyMessageWhenIdle; 381 } 382 383 public boolean isGreedy() { 384 return greedy; 385 } 386 387 /** 388 * If greedy then a poll is executed immediate after a previous poll that polled 1 or more messages. 389 */ 390 public void setGreedy(boolean greedy) { 391 this.greedy = greedy; 392 } 393 394 public int getBackoffCounter() { 395 return backoffCounter; 396 } 397 398 public int getBackoffMultiplier() { 399 return backoffMultiplier; 400 } 401 402 public void setBackoffMultiplier(int backoffMultiplier) { 403 this.backoffMultiplier = backoffMultiplier; 404 } 405 406 public int getBackoffIdleThreshold() { 407 return backoffIdleThreshold; 408 } 409 410 public void setBackoffIdleThreshold(int backoffIdleThreshold) { 411 this.backoffIdleThreshold = backoffIdleThreshold; 412 } 413 414 public int getBackoffErrorThreshold() { 415 return backoffErrorThreshold; 416 } 417 418 public void setBackoffErrorThreshold(int backoffErrorThreshold) { 419 this.backoffErrorThreshold = backoffErrorThreshold; 420 } 421 422 public ScheduledExecutorService getScheduledExecutorService() { 423 return scheduledExecutorService; 424 } 425 426 /** 427 * Whether the scheduler has been started. 428 * <p/> 429 * The scheduler can be started with the {@link #startScheduler()} method. 430 * 431 * @return <tt>true</tt> if started, <tt>false</tt> if not. 432 */ 433 public boolean isSchedulerStarted() { 434 return scheduler.isSchedulerStarted(); 435 } 436 437 /** 438 * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool 439 * <p/> 440 * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread 441 * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool 442 * when the consumer is started/stopped etc.) 443 * 444 * @param scheduledExecutorService the custom thread pool to use 445 */ 446 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { 447 this.scheduledExecutorService = scheduledExecutorService; 448 } 449 450 // Implementation methods 451 // ------------------------------------------------------------------------- 452 453 /** 454 * The polling method which is invoked periodically to poll this consumer 455 * 456 * @return number of messages polled, will be <tt>0</tt> if no message was polled at all. 457 * @throws Exception can be thrown if an exception occurred during polling 458 */ 459 protected abstract int poll() throws Exception; 460 461 @Override 462 protected void doStart() throws Exception { 463 super.doStart(); 464 465 // validate that if backoff multiplier is in use, the threshold values is set correclty 466 if (backoffMultiplier > 0) { 467 if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) { 468 throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier"); 469 } 470 LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", new Object[]{backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()}); 471 } 472 473 if (scheduler == null) { 474 scheduler = new DefaultScheduledPollConsumerScheduler(); 475 } 476 scheduler.setCamelContext(getEndpoint().getCamelContext()); 477 scheduler.onInit(this); 478 scheduler.scheduleTask(this); 479 480 // configure scheduler with options from this consumer 481 Map<String, Object> properties = new HashMap<String, Object>(); 482 IntrospectionSupport.getProperties(this, properties, null); 483 IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties); 484 if (schedulerProperties != null && !schedulerProperties.isEmpty()) { 485 // need to use a copy in case the consumer is restarted so we keep the properties 486 Map<String, Object> copy = new HashMap<String, Object>(schedulerProperties); 487 IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, copy); 488 if (copy.size() > 0) { 489 throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size() 490 + " scheduler parameters that couldn't be set on the endpoint." 491 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 492 + " Unknown parameters=[" + copy + "]"); 493 } 494 } 495 496 ObjectHelper.notNull(scheduler, "scheduler", this); 497 ObjectHelper.notNull(pollStrategy, "pollStrategy", this); 498 499 ServiceHelper.startService(scheduler); 500 501 if (isStartScheduler()) { 502 startScheduler(); 503 } 504 } 505 506 /** 507 * Starts the scheduler. 508 * <p/> 509 * If the scheduler is already started, then this is a noop method call. 510 */ 511 public void startScheduler() { 512 scheduler.startScheduler(); 513 } 514 515 @Override 516 protected void doStop() throws Exception { 517 ServiceHelper.stopService(scheduler); 518 519 // clear counters 520 backoffCounter = 0; 521 idleCounter = 0; 522 errorCounter = 0; 523 524 super.doStop(); 525 } 526 527 @Override 528 protected void doShutdown() throws Exception { 529 ServiceHelper.stopAndShutdownServices(scheduler); 530 super.doShutdown(); 531 } 532 533 @Override 534 protected void doSuspend() throws Exception { 535 // dont stop/cancel the future task since we just check in the run method 536 } 537 538 @Override 539 public void onInit() throws Exception { 540 // make sure the scheduler is starter 541 startScheduler = true; 542 } 543 544 @Override 545 public long beforePoll(long timeout) throws Exception { 546 LOG.trace("Before poll {}", getEndpoint()); 547 // resume or start our self 548 if (!ServiceHelper.resumeService(this)) { 549 ServiceHelper.startService(this); 550 } 551 552 // ensure at least timeout is as long as one poll delay 553 return Math.max(timeout, getDelay()); 554 } 555 556 @Override 557 public void afterPoll() throws Exception { 558 LOG.trace("After poll {}", getEndpoint()); 559 // suspend or stop our self 560 if (!ServiceHelper.suspendService(this)) { 561 ServiceHelper.stopService(this); 562 } 563 } 564 565 }