001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.List;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.CountDownLatch;
022    import java.util.concurrent.ExecutorService;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.AsyncProcessor;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Endpoint;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Processor;
032    import org.apache.camel.ShutdownRunningTask;
033    import org.apache.camel.SuspendableService;
034    import org.apache.camel.impl.LoggingExceptionHandler;
035    import org.apache.camel.processor.MulticastProcessor;
036    import org.apache.camel.spi.ExceptionHandler;
037    import org.apache.camel.spi.ShutdownAware;
038    import org.apache.camel.spi.Synchronization;
039    import org.apache.camel.support.ServiceSupport;
040    import org.apache.camel.util.AsyncProcessorConverterHelper;
041    import org.apache.camel.util.ExchangeHelper;
042    import org.apache.camel.util.ObjectHelper;
043    import org.apache.camel.util.UnitOfWorkHelper;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * A Consumer for the SEDA component.
049     * <p/>
050     * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which
051     * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second.
052     *
053     * @version 
054     */
055    public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, SuspendableService {
056        private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class);
057    
058        private final AtomicInteger taskCount = new AtomicInteger();
059        private volatile CountDownLatch latch;
060        private volatile boolean shutdownPending;
061        private volatile boolean forceShutdown;
062        private SedaEndpoint endpoint;
063        private AsyncProcessor processor;
064        private ExecutorService executor;
065        private ExceptionHandler exceptionHandler;
066        private final int pollTimeout;
067    
068        public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
069            this.endpoint = endpoint;
070            this.processor = AsyncProcessorConverterHelper.convert(processor);
071            this.pollTimeout = endpoint.getPollTimeout();
072            this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
073        }
074    
075        @Override
076        public String toString() {
077            return "SedaConsumer[" + endpoint + "]";
078        }
079    
080        public Endpoint getEndpoint() {
081            return endpoint;
082        }
083    
084        public ExceptionHandler getExceptionHandler() {
085            return exceptionHandler;
086        }
087    
088        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
089            this.exceptionHandler = exceptionHandler;
090        }
091    
092        public Processor getProcessor() {
093            return processor;
094        }
095    
096        public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
097            // deny stopping on shutdown as we want seda consumers to run in case some other queues
098            // depend on this consumer to run, so it can complete its exchanges
099            return true;
100        }
101    
102        public int getPendingExchangesSize() {
103            // the route is shutting down, so either we should purge the queue,
104            // or return how many exchanges are still on the queue
105            if (endpoint.isPurgeWhenStopping()) {
106                endpoint.purgeQueue();
107            }
108            return endpoint.getQueue().size();
109        }
110    
111        @Override
112        public void prepareShutdown(boolean forced) {
113            // signal we want to shutdown
114            shutdownPending = true;
115            forceShutdown = forced;
116    
117            if (latch != null) {
118                LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount());
119    
120                // wait for all threads to end
121                try {
122                    latch.await();
123                } catch (InterruptedException e) {
124                    // ignore
125                }
126            }
127        }
128    
129        @Override
130        public boolean isRunAllowed() {
131            // if we force shutdown then do not allow running anymore
132            if (forceShutdown) {
133                return false;
134            }
135    
136            if (isSuspending() || isSuspended()) {
137                // allow to run even if we are suspended as we want to
138                // keep the thread task running
139                return true;
140            }
141            return super.isRunAllowed();
142        }
143    
144        public void run() {
145            taskCount.incrementAndGet();
146            try {
147                doRun();
148            } finally {
149                taskCount.decrementAndGet();
150            }
151        }
152    
153        protected void doRun() {
154            BlockingQueue<Exchange> queue = endpoint.getQueue();
155            // loop while we are allowed, or if we are stopping loop until the queue is empty
156            while (queue != null && (isRunAllowed())) {
157    
158                // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started
159                if (getEndpoint().getCamelContext().getStatus().isStarting()) {
160                    LOG.trace("CamelContext is starting so skip polling");
161                    try {
162                        // sleep at most 1 sec
163                        Thread.sleep(Math.min(pollTimeout, 1000));
164                    } catch (InterruptedException e) {
165                        LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
166                    }
167                    continue;
168                }
169    
170                // do not poll if we are suspended
171                if (isSuspending() || isSuspended()) {
172                    if (shutdownPending && queue.isEmpty()) {
173                        LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty.");
174                        // we want to shutdown so break out if there queue is empty
175                        break;
176                    } else {
177                        LOG.trace("Consumer is suspended so skip polling");
178                        try {
179                            // sleep at most 1 sec
180                            Thread.sleep(Math.min(pollTimeout, 1000));
181                        } catch (InterruptedException e) {
182                            LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
183                        }
184                        continue;
185                    }
186                }
187    
188                Exchange exchange = null;
189                try {
190                    // use the end user configured poll timeout
191                    exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
192                    if (LOG.isTraceEnabled()) {
193                        LOG.trace("Polled queue {} with timeout {} ms. -> {}", new Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange});
194                    }
195                    if (exchange != null) {
196                        try {
197                            // send a new copied exchange with new camel context
198                            Exchange newExchange = prepareExchange(exchange);
199                            // process the exchange
200                            sendToConsumers(newExchange);
201                            // copy the message back
202                            if (newExchange.hasOut()) {
203                                exchange.setOut(newExchange.getOut().copy());
204                            } else {
205                                exchange.setIn(newExchange.getIn());
206                            }
207                            // log exception if an exception occurred and was not handled
208                            if (newExchange.getException() != null) {
209                                exchange.setException(newExchange.getException());
210                                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
211                            }
212                        } catch (Exception e) {
213                            getExceptionHandler().handleException("Error processing exchange", exchange, e);
214                        }
215                    } else if (shutdownPending && queue.isEmpty()) {
216                        LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty.");
217                        // we want to shutdown so break out if there queue is empty
218                        break;
219                    }
220                } catch (InterruptedException e) {
221                    LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
222                    continue;
223                } catch (Throwable e) {
224                    if (exchange != null) {
225                        getExceptionHandler().handleException("Error processing exchange", exchange, e);
226                    } else {
227                        getExceptionHandler().handleException(e);
228                    }
229                }
230            }
231    
232            latch.countDown();
233            LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount());
234        }
235    
236        /**
237         * Strategy to prepare exchange for being processed by this consumer
238         *
239         * @param exchange the exchange
240         * @return the exchange to process by this consumer.
241         */
242        protected Exchange prepareExchange(Exchange exchange) {
243            // send a new copied exchange with new camel context
244            Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext());
245            // set the from endpoint
246            newExchange.setFromEndpoint(endpoint);
247            return newExchange;
248        }
249    
250        /**
251         * Send the given {@link Exchange} to the consumer(s).
252         * <p/>
253         * If multiple consumers then they will each receive a copy of the Exchange.
254         * A multicast processor will send the exchange in parallel to the multiple consumers.
255         * <p/>
256         * If there is only a single consumer then its dispatched directly to it using same thread.
257         * 
258         * @param exchange the exchange
259         * @throws Exception can be thrown if processing of the exchange failed
260         */
261        protected void sendToConsumers(final Exchange exchange) throws Exception {
262            // validate multiple consumers has been enabled
263            int size = endpoint.getConsumers().size();
264            if (size > 1 && !endpoint.isMultipleConsumersSupported()) {
265                throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint);
266            }
267    
268            // if there are multiple consumers then multicast to them
269            if (endpoint.isMultipleConsumersSupported()) {
270    
271                if (LOG.isTraceEnabled()) {
272                    LOG.trace("Multicasting to {} consumers for Exchange: {}", size, exchange);
273                }
274    
275                // handover completions, as we need to done this when the multicast is done
276                final List<Synchronization> completions = exchange.handoverCompletions();
277    
278                // use a multicast processor to process it
279                MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
280                ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
281    
282                // and use the asynchronous routing engine to support it
283                mp.process(exchange, new AsyncCallback() {
284                    public void done(boolean doneSync) {
285                        // done the uow on the completions
286                        UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG);
287                    }
288                });
289            } else {
290                // use the regular processor and use the asynchronous routing engine to support it
291                processor.process(exchange, new AsyncCallback() {
292                    public void done(boolean doneSync) {
293                        // noop
294                    }
295                });
296            }
297        }
298    
299        protected void doStart() throws Exception {
300            latch = new CountDownLatch(endpoint.getConcurrentConsumers());
301            shutdownPending = false;
302            forceShutdown = false;
303    
304            setupTasks();
305            endpoint.onStarted(this);
306        }
307    
308        @Override
309        protected void doSuspend() throws Exception {
310            endpoint.onStopped(this);
311        }
312    
313        @Override
314        protected void doResume() throws Exception {
315            doStart();
316        }
317    
318        protected void doStop() throws Exception {
319            // ensure queue is purged if we stop the consumer
320            if (endpoint.isPurgeWhenStopping()) {
321                endpoint.purgeQueue();
322            }
323    
324            endpoint.onStopped(this);
325            
326            shutdownExecutor();
327        }
328    
329        @Override
330        protected void doShutdown() throws Exception {
331            shutdownExecutor();
332        }
333    
334        private void shutdownExecutor() {
335            if (executor != null) {
336                endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
337                executor = null;
338            }
339        }
340    
341        /**
342         * Setup the thread pool and ensures tasks gets executed (if needed)
343         */
344        private void setupTasks() {
345            int poolSize = endpoint.getConcurrentConsumers();
346    
347            // create thread pool if needed
348            if (executor == null) {
349                executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
350            }
351    
352            // submit needed number of tasks
353            int tasks = poolSize - taskCount.get();
354            LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout);
355            for (int i = 0; i < tasks; i++) {
356                executor.execute(this);
357            }
358        }
359    
360    }