001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.concurrent.ArrayBlockingQueue; 020 import java.util.concurrent.BlockingQueue; 021 import java.util.concurrent.RejectedExecutionException; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.Consumer; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.PollingConsumerPollingStrategy; 028 import org.apache.camel.Processor; 029 import org.apache.camel.spi.ExceptionHandler; 030 import org.apache.camel.util.ServiceHelper; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 /** 035 * A default implementation of the {@link org.apache.camel.PollingConsumer} which uses the normal 036 * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow 037 * the caller to pull messages on demand. 038 * 039 * @version 040 */ 041 public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor { 042 private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class); 043 private final BlockingQueue<Exchange> queue; 044 private ExceptionHandler interruptedExceptionHandler; 045 private Consumer consumer; 046 047 public EventDrivenPollingConsumer(Endpoint endpoint) { 048 this(endpoint, new ArrayBlockingQueue<Exchange>(1000)); 049 } 050 051 public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) { 052 super(endpoint); 053 this.queue = queue; 054 this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); 055 } 056 057 public Exchange receiveNoWait() { 058 return receive(0); 059 } 060 061 public Exchange receive() { 062 // must be started 063 if (!isRunAllowed() || !isStarted()) { 064 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 065 } 066 067 while (isRunAllowed()) { 068 try { 069 beforePoll(0); 070 // take will block waiting for message 071 return queue.take(); 072 } catch (InterruptedException e) { 073 handleInterruptedException(e); 074 } finally { 075 afterPoll(); 076 } 077 } 078 LOG.trace("Consumer is not running, so returning null"); 079 return null; 080 } 081 082 public Exchange receive(long timeout) { 083 // must be started 084 if (!isRunAllowed() || !isStarted()) { 085 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 086 } 087 088 try { 089 // use the timeout value returned from beforePoll 090 timeout = beforePoll(timeout); 091 return queue.poll(timeout, TimeUnit.MILLISECONDS); 092 } catch (InterruptedException e) { 093 handleInterruptedException(e); 094 return null; 095 } finally { 096 afterPoll(); 097 } 098 } 099 100 public void process(Exchange exchange) throws Exception { 101 queue.offer(exchange); 102 } 103 104 public ExceptionHandler getInterruptedExceptionHandler() { 105 return interruptedExceptionHandler; 106 } 107 108 public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) { 109 this.interruptedExceptionHandler = interruptedExceptionHandler; 110 } 111 112 protected void handleInterruptedException(InterruptedException e) { 113 getInterruptedExceptionHandler().handleException(e); 114 } 115 116 protected long beforePoll(long timeout) { 117 if (consumer instanceof PollingConsumerPollingStrategy) { 118 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 119 try { 120 timeout = strategy.beforePoll(timeout); 121 } catch (Exception e) { 122 LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e); 123 } 124 } 125 return timeout; 126 } 127 128 protected void afterPoll() { 129 if (consumer instanceof PollingConsumerPollingStrategy) { 130 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 131 try { 132 strategy.afterPoll(); 133 } catch (Exception e) { 134 LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e); 135 } 136 } 137 } 138 139 protected void doStart() throws Exception { 140 // lets add ourselves as a consumer 141 consumer = getEndpoint().createConsumer(this); 142 143 // if the consumer has a polling strategy then invoke that 144 if (consumer instanceof PollingConsumerPollingStrategy) { 145 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 146 strategy.onInit(); 147 } else { 148 ServiceHelper.startService(consumer); 149 } 150 } 151 152 protected void doStop() throws Exception { 153 ServiceHelper.stopService(consumer); 154 } 155 156 protected void doShutdown() throws Exception { 157 ServiceHelper.stopAndShutdownService(consumer); 158 } 159 }