001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.List; 020 import java.util.concurrent.BlockingQueue; 021 import java.util.concurrent.CountDownLatch; 022 import java.util.concurrent.ExecutorService; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.camel.AsyncCallback; 027 import org.apache.camel.AsyncProcessor; 028 import org.apache.camel.Consumer; 029 import org.apache.camel.Endpoint; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Processor; 032 import org.apache.camel.ShutdownRunningTask; 033 import org.apache.camel.SuspendableService; 034 import org.apache.camel.impl.LoggingExceptionHandler; 035 import org.apache.camel.processor.MulticastProcessor; 036 import org.apache.camel.spi.ExceptionHandler; 037 import org.apache.camel.spi.ShutdownAware; 038 import org.apache.camel.spi.Synchronization; 039 import org.apache.camel.support.ServiceSupport; 040 import org.apache.camel.util.AsyncProcessorConverterHelper; 041 import org.apache.camel.util.ExchangeHelper; 042 import org.apache.camel.util.ObjectHelper; 043 import org.apache.camel.util.UnitOfWorkHelper; 044 import org.slf4j.Logger; 045 import org.slf4j.LoggerFactory; 046 047 /** 048 * A Consumer for the SEDA component. 049 * <p/> 050 * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which 051 * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second. 052 * 053 * @version 054 */ 055 public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, SuspendableService { 056 private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); 057 058 private final AtomicInteger taskCount = new AtomicInteger(); 059 private volatile CountDownLatch latch; 060 private volatile boolean shutdownPending; 061 private volatile boolean forceShutdown; 062 private SedaEndpoint endpoint; 063 private AsyncProcessor processor; 064 private ExecutorService executor; 065 private ExceptionHandler exceptionHandler; 066 private final int pollTimeout; 067 068 public SedaConsumer(SedaEndpoint endpoint, Processor processor) { 069 this.endpoint = endpoint; 070 this.processor = AsyncProcessorConverterHelper.convert(processor); 071 this.pollTimeout = endpoint.getPollTimeout(); 072 this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); 073 } 074 075 @Override 076 public String toString() { 077 return "SedaConsumer[" + endpoint + "]"; 078 } 079 080 public Endpoint getEndpoint() { 081 return endpoint; 082 } 083 084 public ExceptionHandler getExceptionHandler() { 085 return exceptionHandler; 086 } 087 088 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 089 this.exceptionHandler = exceptionHandler; 090 } 091 092 public Processor getProcessor() { 093 return processor; 094 } 095 096 public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { 097 // deny stopping on shutdown as we want seda consumers to run in case some other queues 098 // depend on this consumer to run, so it can complete its exchanges 099 return true; 100 } 101 102 public int getPendingExchangesSize() { 103 // the route is shutting down, so either we should purge the queue, 104 // or return how many exchanges are still on the queue 105 if (endpoint.isPurgeWhenStopping()) { 106 endpoint.purgeQueue(); 107 } 108 return endpoint.getQueue().size(); 109 } 110 111 @Override 112 public void prepareShutdown(boolean forced) { 113 // signal we want to shutdown 114 shutdownPending = true; 115 forceShutdown = forced; 116 117 if (latch != null) { 118 LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); 119 120 // wait for all threads to end 121 try { 122 latch.await(); 123 } catch (InterruptedException e) { 124 // ignore 125 } 126 } 127 } 128 129 @Override 130 public boolean isRunAllowed() { 131 // if we force shutdown then do not allow running anymore 132 if (forceShutdown) { 133 return false; 134 } 135 136 if (isSuspending() || isSuspended()) { 137 // allow to run even if we are suspended as we want to 138 // keep the thread task running 139 return true; 140 } 141 return super.isRunAllowed(); 142 } 143 144 public void run() { 145 taskCount.incrementAndGet(); 146 try { 147 doRun(); 148 } finally { 149 taskCount.decrementAndGet(); 150 } 151 } 152 153 protected void doRun() { 154 BlockingQueue<Exchange> queue = endpoint.getQueue(); 155 // loop while we are allowed, or if we are stopping loop until the queue is empty 156 while (queue != null && (isRunAllowed())) { 157 158 // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started 159 if (getEndpoint().getCamelContext().getStatus().isStarting()) { 160 LOG.trace("CamelContext is starting so skip polling"); 161 try { 162 // sleep at most 1 sec 163 Thread.sleep(Math.min(pollTimeout, 1000)); 164 } catch (InterruptedException e) { 165 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 166 } 167 continue; 168 } 169 170 // do not poll if we are suspended 171 if (isSuspending() || isSuspended()) { 172 if (shutdownPending && queue.isEmpty()) { 173 LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); 174 // we want to shutdown so break out if there queue is empty 175 break; 176 } else { 177 LOG.trace("Consumer is suspended so skip polling"); 178 try { 179 // sleep at most 1 sec 180 Thread.sleep(Math.min(pollTimeout, 1000)); 181 } catch (InterruptedException e) { 182 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 183 } 184 continue; 185 } 186 } 187 188 Exchange exchange = null; 189 try { 190 // use the end user configured poll timeout 191 exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); 192 if (LOG.isTraceEnabled()) { 193 LOG.trace("Polled queue {} with timeout {} ms. -> {}", new Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange}); 194 } 195 if (exchange != null) { 196 try { 197 // send a new copied exchange with new camel context 198 Exchange newExchange = prepareExchange(exchange); 199 // process the exchange 200 sendToConsumers(newExchange); 201 // copy the message back 202 if (newExchange.hasOut()) { 203 exchange.setOut(newExchange.getOut().copy()); 204 } else { 205 exchange.setIn(newExchange.getIn()); 206 } 207 // log exception if an exception occurred and was not handled 208 if (newExchange.getException() != null) { 209 exchange.setException(newExchange.getException()); 210 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 211 } 212 } catch (Exception e) { 213 getExceptionHandler().handleException("Error processing exchange", exchange, e); 214 } 215 } else if (shutdownPending && queue.isEmpty()) { 216 LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); 217 // we want to shutdown so break out if there queue is empty 218 break; 219 } 220 } catch (InterruptedException e) { 221 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 222 continue; 223 } catch (Throwable e) { 224 if (exchange != null) { 225 getExceptionHandler().handleException("Error processing exchange", exchange, e); 226 } else { 227 getExceptionHandler().handleException(e); 228 } 229 } 230 } 231 232 latch.countDown(); 233 LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount()); 234 } 235 236 /** 237 * Strategy to prepare exchange for being processed by this consumer 238 * 239 * @param exchange the exchange 240 * @return the exchange to process by this consumer. 241 */ 242 protected Exchange prepareExchange(Exchange exchange) { 243 // send a new copied exchange with new camel context 244 Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext()); 245 // set the from endpoint 246 newExchange.setFromEndpoint(endpoint); 247 return newExchange; 248 } 249 250 /** 251 * Send the given {@link Exchange} to the consumer(s). 252 * <p/> 253 * If multiple consumers then they will each receive a copy of the Exchange. 254 * A multicast processor will send the exchange in parallel to the multiple consumers. 255 * <p/> 256 * If there is only a single consumer then its dispatched directly to it using same thread. 257 * 258 * @param exchange the exchange 259 * @throws Exception can be thrown if processing of the exchange failed 260 */ 261 protected void sendToConsumers(final Exchange exchange) throws Exception { 262 // validate multiple consumers has been enabled 263 int size = endpoint.getConsumers().size(); 264 if (size > 1 && !endpoint.isMultipleConsumersSupported()) { 265 throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); 266 } 267 268 // if there are multiple consumers then multicast to them 269 if (endpoint.isMultipleConsumersSupported()) { 270 271 if (LOG.isTraceEnabled()) { 272 LOG.trace("Multicasting to {} consumers for Exchange: {}", size, exchange); 273 } 274 275 // handover completions, as we need to done this when the multicast is done 276 final List<Synchronization> completions = exchange.handoverCompletions(); 277 278 // use a multicast processor to process it 279 MulticastProcessor mp = endpoint.getConsumerMulticastProcessor(); 280 ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this); 281 282 // and use the asynchronous routing engine to support it 283 mp.process(exchange, new AsyncCallback() { 284 public void done(boolean doneSync) { 285 // done the uow on the completions 286 UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG); 287 } 288 }); 289 } else { 290 // use the regular processor and use the asynchronous routing engine to support it 291 processor.process(exchange, new AsyncCallback() { 292 public void done(boolean doneSync) { 293 // noop 294 } 295 }); 296 } 297 } 298 299 protected void doStart() throws Exception { 300 latch = new CountDownLatch(endpoint.getConcurrentConsumers()); 301 shutdownPending = false; 302 forceShutdown = false; 303 304 setupTasks(); 305 endpoint.onStarted(this); 306 } 307 308 @Override 309 protected void doSuspend() throws Exception { 310 endpoint.onStopped(this); 311 } 312 313 @Override 314 protected void doResume() throws Exception { 315 doStart(); 316 } 317 318 protected void doStop() throws Exception { 319 // ensure queue is purged if we stop the consumer 320 if (endpoint.isPurgeWhenStopping()) { 321 endpoint.purgeQueue(); 322 } 323 324 endpoint.onStopped(this); 325 326 shutdownExecutor(); 327 } 328 329 @Override 330 protected void doShutdown() throws Exception { 331 shutdownExecutor(); 332 } 333 334 private void shutdownExecutor() { 335 if (executor != null) { 336 endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); 337 executor = null; 338 } 339 } 340 341 /** 342 * Setup the thread pool and ensures tasks gets executed (if needed) 343 */ 344 private void setupTasks() { 345 int poolSize = endpoint.getConcurrentConsumers(); 346 347 // create thread pool if needed 348 if (executor == null) { 349 executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 350 } 351 352 // submit needed number of tasks 353 int tasks = poolSize - taskCount.get(); 354 LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout); 355 for (int i = 0; i < tasks; i++) { 356 executor.execute(this); 357 } 358 } 359 360 }