001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ArrayBlockingQueue;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.RejectedExecutionException;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.Consumer;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.PollingConsumerPollingStrategy;
028    import org.apache.camel.Processor;
029    import org.apache.camel.spi.ExceptionHandler;
030    import org.apache.camel.util.ServiceHelper;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * A default implementation of the {@link org.apache.camel.PollingConsumer} which uses the normal
036     * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow
037     * the caller to pull messages on demand.
038     *
039     * @version 
040     */
041    public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor {
042        private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
043        private final BlockingQueue<Exchange> queue;
044        private ExceptionHandler interruptedExceptionHandler;
045        private Consumer consumer;
046    
047        public EventDrivenPollingConsumer(Endpoint endpoint) {
048            this(endpoint, new ArrayBlockingQueue<Exchange>(1000));
049        }
050    
051        public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
052            super(endpoint);
053            this.queue = queue;
054            this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
055        }
056    
057        public Exchange receiveNoWait() {
058            return receive(0);
059        }
060    
061        public Exchange receive() {
062            // must be started
063            if (!isRunAllowed() || !isStarted()) {
064                throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
065            }
066    
067            while (isRunAllowed()) {
068                try {
069                    beforePoll(0);
070                    // take will block waiting for message
071                    return queue.take();
072                } catch (InterruptedException e) {
073                    handleInterruptedException(e);
074                } finally {
075                    afterPoll();
076                }
077            }
078            LOG.trace("Consumer is not running, so returning null");
079            return null;
080        }
081    
082        public Exchange receive(long timeout) {
083            // must be started
084            if (!isRunAllowed() || !isStarted()) {
085                throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
086            }
087    
088            try {
089                // use the timeout value returned from beforePoll
090                timeout = beforePoll(timeout);
091                return queue.poll(timeout, TimeUnit.MILLISECONDS);
092            } catch (InterruptedException e) {
093                handleInterruptedException(e);
094                return null;
095            } finally {
096                afterPoll();
097            }
098        }
099    
100        public void process(Exchange exchange) throws Exception {
101            queue.offer(exchange);
102        }
103    
104        public ExceptionHandler getInterruptedExceptionHandler() {
105            return interruptedExceptionHandler;
106        }
107    
108        public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) {
109            this.interruptedExceptionHandler = interruptedExceptionHandler;
110        }
111    
112        protected void handleInterruptedException(InterruptedException e) {
113            getInterruptedExceptionHandler().handleException(e);
114        }
115    
116        protected long beforePoll(long timeout) {
117            if (consumer instanceof PollingConsumerPollingStrategy) {
118                PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
119                try {
120                    timeout = strategy.beforePoll(timeout);
121                } catch (Exception e) {
122                    LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
123                }
124            }
125            return timeout;
126        }
127    
128        protected void afterPoll() {
129            if (consumer instanceof PollingConsumerPollingStrategy) {
130                PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
131                try {
132                    strategy.afterPoll();
133                } catch (Exception e) {
134                    LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e);
135                }
136            }
137        }
138    
139        protected void doStart() throws Exception {
140            // lets add ourselves as a consumer
141            consumer = getEndpoint().createConsumer(this);
142    
143            // if the consumer has a polling strategy then invoke that
144            if (consumer instanceof PollingConsumerPollingStrategy) {
145                PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
146                strategy.onInit();
147            } else {
148                ServiceHelper.startService(consumer);
149            }
150        }
151    
152        protected void doStop() throws Exception {
153            ServiceHelper.stopService(consumer);
154        }
155    
156        protected void doShutdown() throws Exception {
157            ServiceHelper.stopAndShutdownService(consumer);
158        }
159    }