001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.ArrayList; 020 import java.util.HashSet; 021 import java.util.List; 022 import java.util.Set; 023 import java.util.concurrent.BlockingQueue; 024 import java.util.concurrent.CopyOnWriteArraySet; 025 import java.util.concurrent.ExecutorService; 026 027 import org.apache.camel.Component; 028 import org.apache.camel.Consumer; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.Message; 031 import org.apache.camel.MultipleConsumersSupport; 032 import org.apache.camel.PollingConsumer; 033 import org.apache.camel.Processor; 034 import org.apache.camel.Producer; 035 import org.apache.camel.WaitForTaskToComplete; 036 import org.apache.camel.api.management.ManagedAttribute; 037 import org.apache.camel.api.management.ManagedOperation; 038 import org.apache.camel.api.management.ManagedResource; 039 import org.apache.camel.impl.DefaultEndpoint; 040 import org.apache.camel.processor.MulticastProcessor; 041 import org.apache.camel.spi.BrowsableEndpoint; 042 import org.apache.camel.spi.UriEndpoint; 043 import org.apache.camel.spi.UriParam; 044 import org.apache.camel.util.EndpointHelper; 045 import org.apache.camel.util.MessageHelper; 046 import org.apache.camel.util.ServiceHelper; 047 import org.apache.camel.util.URISupport; 048 import org.slf4j.Logger; 049 import org.slf4j.LoggerFactory; 050 051 /** 052 * An implementation of the <a 053 * href="http://camel.apache.org/queue.html">Queue components</a> for 054 * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext 055 */ 056 @ManagedResource(description = "Managed SedaEndpoint") 057 @UriEndpoint(scheme = "seda", consumerClass = SedaConsumer.class) 058 public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { 059 private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class); 060 private volatile BlockingQueue<Exchange> queue; 061 private final Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>(); 062 private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>(); 063 private volatile MulticastProcessor consumerMulticastProcessor; 064 private volatile boolean multicastStarted; 065 private volatile ExecutorService multicastExecutor; 066 @UriParam 067 private int size = Integer.MAX_VALUE; 068 @UriParam 069 private int concurrentConsumers = 1; 070 @UriParam 071 private boolean multipleConsumers; 072 @UriParam 073 private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; 074 @UriParam 075 private long timeout = 30000; 076 @UriParam 077 private boolean blockWhenFull; 078 @UriParam 079 private int pollTimeout = 1000; 080 @UriParam 081 private boolean purgeWhenStopping; 082 083 @UriParam 084 private boolean failIfNoConsumers; 085 086 private BlockingQueueFactory<Exchange> queueFactory; 087 088 public SedaEndpoint() { 089 queueFactory = new LinkedBlockingQueueFactory<Exchange>(); 090 } 091 092 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 093 this(endpointUri, component, queue, 1); 094 } 095 096 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { 097 this(endpointUri, component, concurrentConsumers); 098 this.queue = queue; 099 if (queue != null) { 100 this.size = queue.remainingCapacity(); 101 } 102 queueFactory = new LinkedBlockingQueueFactory<Exchange>(); 103 getComponent().registerQueue(this, queue); 104 } 105 106 public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) { 107 this(endpointUri, component, concurrentConsumers); 108 this.queueFactory = queueFactory; 109 } 110 111 private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) { 112 super(endpointUri, component); 113 this.concurrentConsumers = concurrentConsumers; 114 } 115 116 @Override 117 public SedaComponent getComponent() { 118 return (SedaComponent) super.getComponent(); 119 } 120 121 public Producer createProducer() throws Exception { 122 return new SedaProducer(this, getWaitForTaskToComplete(), getTimeout(), isBlockWhenFull()); 123 } 124 125 public Consumer createConsumer(Processor processor) throws Exception { 126 if (getComponent() != null) { 127 // all consumers must match having the same multipleConsumers options 128 String key = getComponent().getQueueKey(getEndpointUri()); 129 QueueReference ref = getComponent().getQueueReference(key); 130 if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { 131 // there is already a multiple consumers, so make sure they matches 132 throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " 133 + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers); 134 } 135 } 136 137 Consumer answer = new SedaConsumer(this, processor); 138 configureConsumer(answer); 139 return answer; 140 } 141 142 @Override 143 public PollingConsumer createPollingConsumer() throws Exception { 144 SedaPollingConsumer answer = new SedaPollingConsumer(this); 145 configureConsumer(answer); 146 return answer; 147 } 148 149 public synchronized BlockingQueue<Exchange> getQueue() { 150 if (queue == null) { 151 // prefer to lookup queue from component, so if this endpoint is re-created or re-started 152 // then the existing queue from the component can be used, so new producers and consumers 153 // can use the already existing queue referenced from the component 154 if (getComponent() != null) { 155 // use null to indicate default size (= use what the existing queue has been configured with) 156 Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); 157 QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); 158 queue = ref.getQueue(); 159 String key = getComponent().getQueueKey(getEndpointUri()); 160 LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); 161 // and set the size we are using 162 if (ref.getSize() != null) { 163 setSize(ref.getSize()); 164 } 165 } else { 166 // fallback and create queue (as this endpoint has no component) 167 queue = createQueue(); 168 LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()}); 169 } 170 } 171 return queue; 172 } 173 174 protected BlockingQueue<Exchange> createQueue() { 175 if (size > 0) { 176 return queueFactory.create(size); 177 } else { 178 return queueFactory.create(); 179 } 180 } 181 182 public synchronized QueueReference getQueueReference() { 183 String key = getComponent().getQueueKey(getEndpointUri()); 184 QueueReference ref = getComponent().getQueueReference(key); 185 if (ref == null) { 186 LOG.warn("There was no queue reference for the endpoint {0}", getEndpointUri()); 187 } 188 return ref; 189 } 190 191 protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception { 192 if (!multicastStarted && consumerMulticastProcessor != null) { 193 // only start it on-demand to avoid starting it during stopping 194 ServiceHelper.startService(consumerMulticastProcessor); 195 multicastStarted = true; 196 } 197 return consumerMulticastProcessor; 198 } 199 200 protected synchronized void updateMulticastProcessor() throws Exception { 201 // only needed if we support multiple consumers 202 if (!isMultipleConsumersSupported()) { 203 return; 204 } 205 206 // stop old before we create a new 207 if (consumerMulticastProcessor != null) { 208 ServiceHelper.stopService(consumerMulticastProcessor); 209 consumerMulticastProcessor = null; 210 } 211 212 int size = getConsumers().size(); 213 if (size >= 1) { 214 if (multicastExecutor == null) { 215 // create multicast executor as we need it when we have more than 1 processor 216 multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); 217 } 218 // create list of consumers to multicast to 219 List<Processor> processors = new ArrayList<Processor>(size); 220 for (SedaConsumer consumer : getConsumers()) { 221 processors.add(consumer.getProcessor()); 222 } 223 // create multicast processor 224 multicastStarted = false; 225 consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false); 226 } 227 } 228 229 public void setQueue(BlockingQueue<Exchange> queue) { 230 this.queue = queue; 231 this.size = queue.remainingCapacity(); 232 } 233 234 @ManagedAttribute(description = "Queue max capacity") 235 public int getSize() { 236 return size; 237 } 238 239 public void setSize(int size) { 240 this.size = size; 241 } 242 243 @ManagedAttribute(description = "Current queue size") 244 public int getCurrentQueueSize() { 245 return queue.size(); 246 } 247 248 public void setBlockWhenFull(boolean blockWhenFull) { 249 this.blockWhenFull = blockWhenFull; 250 } 251 252 @ManagedAttribute(description = "Whether the caller will block sending to a full queue") 253 public boolean isBlockWhenFull() { 254 return blockWhenFull; 255 } 256 257 public void setConcurrentConsumers(int concurrentConsumers) { 258 this.concurrentConsumers = concurrentConsumers; 259 } 260 261 @ManagedAttribute(description = "Number of concurrent consumers") 262 public int getConcurrentConsumers() { 263 return concurrentConsumers; 264 } 265 266 public WaitForTaskToComplete getWaitForTaskToComplete() { 267 return waitForTaskToComplete; 268 } 269 270 public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) { 271 this.waitForTaskToComplete = waitForTaskToComplete; 272 } 273 274 @ManagedAttribute 275 public long getTimeout() { 276 return timeout; 277 } 278 279 public void setTimeout(long timeout) { 280 this.timeout = timeout; 281 } 282 283 @ManagedAttribute 284 public boolean isFailIfNoConsumers() { 285 return failIfNoConsumers; 286 } 287 288 public void setFailIfNoConsumers(boolean failIfNoConsumers) { 289 this.failIfNoConsumers = failIfNoConsumers; 290 } 291 292 @ManagedAttribute 293 public boolean isMultipleConsumers() { 294 return multipleConsumers; 295 } 296 297 public void setMultipleConsumers(boolean multipleConsumers) { 298 this.multipleConsumers = multipleConsumers; 299 } 300 301 @ManagedAttribute 302 public int getPollTimeout() { 303 return pollTimeout; 304 } 305 306 public void setPollTimeout(int pollTimeout) { 307 this.pollTimeout = pollTimeout; 308 } 309 310 @ManagedAttribute 311 public boolean isPurgeWhenStopping() { 312 return purgeWhenStopping; 313 } 314 315 public void setPurgeWhenStopping(boolean purgeWhenStopping) { 316 this.purgeWhenStopping = purgeWhenStopping; 317 } 318 319 @ManagedAttribute(description = "Singleton") 320 public boolean isSingleton() { 321 return true; 322 } 323 324 /** 325 * Returns the current pending exchanges 326 */ 327 public List<Exchange> getExchanges() { 328 return new ArrayList<Exchange>(getQueue()); 329 } 330 331 @ManagedAttribute 332 public boolean isMultipleConsumersSupported() { 333 return isMultipleConsumers(); 334 } 335 336 /** 337 * Purges the queue 338 */ 339 @ManagedOperation(description = "Purges the seda queue") 340 public void purgeQueue() { 341 LOG.debug("Purging queue with {} exchanges", queue.size()); 342 queue.clear(); 343 } 344 345 /** 346 * Returns the current active consumers on this endpoint 347 */ 348 public Set<SedaConsumer> getConsumers() { 349 return new HashSet<SedaConsumer>(consumers); 350 } 351 352 /** 353 * Returns the current active producers on this endpoint 354 */ 355 public Set<SedaProducer> getProducers() { 356 return new HashSet<SedaProducer>(producers); 357 } 358 359 @ManagedOperation(description = "Current number of Exchanges in Queue") 360 public long queueSize() { 361 return getExchanges().size(); 362 } 363 364 @ManagedOperation(description = "Get Exchange from queue by index") 365 public String browseExchange(Integer index) { 366 List<Exchange> exchanges = getExchanges(); 367 if (index >= exchanges.size()) { 368 return null; 369 } 370 Exchange exchange = exchanges.get(index); 371 if (exchange == null) { 372 return null; 373 } 374 // must use java type with JMX such as java.lang.String 375 return exchange.toString(); 376 } 377 378 @ManagedOperation(description = "Get message body from queue by index") 379 public String browseMessageBody(Integer index) { 380 List<Exchange> exchanges = getExchanges(); 381 if (index >= exchanges.size()) { 382 return null; 383 } 384 Exchange exchange = exchanges.get(index); 385 if (exchange == null) { 386 return null; 387 } 388 389 // must use java type with JMX such as java.lang.String 390 String body; 391 if (exchange.hasOut()) { 392 body = exchange.getOut().getBody(String.class); 393 } else { 394 body = exchange.getIn().getBody(String.class); 395 } 396 397 return body; 398 } 399 400 @ManagedOperation(description = "Get message as XML from queue by index") 401 public String browseMessageAsXml(Integer index, Boolean includeBody) { 402 List<Exchange> exchanges = getExchanges(); 403 if (index >= exchanges.size()) { 404 return null; 405 } 406 Exchange exchange = exchanges.get(index); 407 if (exchange == null) { 408 return null; 409 } 410 411 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 412 String xml = MessageHelper.dumpAsXml(msg, includeBody); 413 414 return xml; 415 } 416 417 @ManagedOperation(description = "Gets all the messages as XML from the queue") 418 public String browseAllMessagesAsXml(Boolean includeBody) { 419 return browseRangeMessagesAsXml(0, Integer.MAX_VALUE, includeBody); 420 } 421 422 @ManagedOperation(description = "Gets the range of messages as XML from the queue") 423 public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) { 424 return EndpointHelper.browseRangeMessagesAsXml(this, fromIndex, toIndex, includeBody); 425 } 426 427 @ManagedAttribute(description = "Camel context ID") 428 public String getCamelId() { 429 return getCamelContext().getName(); 430 } 431 432 @ManagedAttribute(description = "Camel ManagementName") 433 public String getCamelManagementName() { 434 return getCamelContext().getManagementName(); 435 } 436 437 @ManagedAttribute(description = "Endpoint URI", mask = true) 438 public String getEndpointUri() { 439 return super.getEndpointUri(); 440 } 441 442 @ManagedAttribute(description = "Endpoint service state") 443 public String getState() { 444 return getStatus().name(); 445 } 446 447 void onStarted(SedaProducer producer) { 448 producers.add(producer); 449 } 450 451 void onStopped(SedaProducer producer) { 452 producers.remove(producer); 453 } 454 455 void onStarted(SedaConsumer consumer) throws Exception { 456 consumers.add(consumer); 457 if (isMultipleConsumers()) { 458 updateMulticastProcessor(); 459 } 460 } 461 462 void onStopped(SedaConsumer consumer) throws Exception { 463 consumers.remove(consumer); 464 if (isMultipleConsumers()) { 465 updateMulticastProcessor(); 466 } 467 } 468 469 public boolean hasConsumers() { 470 return this.consumers.size() > 0; 471 } 472 473 @Override 474 protected void doStart() throws Exception { 475 super.doStart(); 476 477 // force creating queue when starting 478 if (queue == null) { 479 queue = getQueue(); 480 } 481 482 // special for unit testing where we can set a system property to make seda poll faster 483 // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project 484 String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout()); 485 setPollTimeout(Integer.valueOf(override)); 486 } 487 488 @Override 489 public void stop() throws Exception { 490 if (getConsumers().isEmpty()) { 491 super.stop(); 492 } else { 493 LOG.debug("There is still active consumers."); 494 } 495 } 496 497 @Override 498 public void shutdown() throws Exception { 499 if (shutdown.get()) { 500 LOG.trace("Service already shut down"); 501 return; 502 } 503 504 // notify component we are shutting down this endpoint 505 if (getComponent() != null) { 506 getComponent().onShutdownEndpoint(this); 507 } 508 509 if (getConsumers().isEmpty()) { 510 super.shutdown(); 511 } else { 512 LOG.debug("There is still active consumers."); 513 } 514 } 515 516 @Override 517 protected void doShutdown() throws Exception { 518 // shutdown thread pool if it was in use 519 if (multicastExecutor != null) { 520 getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor); 521 multicastExecutor = null; 522 } 523 524 // clear queue, as we are shutdown, so if re-created then the queue must be updated 525 queue = null; 526 } 527 528 }