001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.BlockingQueue; 022 023 import org.apache.camel.Endpoint; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.impl.UriEndpointComponent; 026 import org.slf4j.Logger; 027 import org.slf4j.LoggerFactory; 028 029 /** 030 * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a> 031 * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext 032 * 033 * @version 034 */ 035 public class SedaComponent extends UriEndpointComponent { 036 protected final Logger log = LoggerFactory.getLogger(getClass()); 037 protected final int maxConcurrentConsumers = 500; 038 protected int queueSize; 039 protected int defaultConcurrentConsumers = 1; 040 private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>(); 041 private BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<Exchange>(); 042 043 public SedaComponent() { 044 super(SedaEndpoint.class); 045 } 046 047 public void setQueueSize(int size) { 048 queueSize = size; 049 } 050 051 public int getQueueSize() { 052 return queueSize; 053 } 054 055 public void setConcurrentConsumers(int size) { 056 defaultConcurrentConsumers = size; 057 } 058 059 public int getConcurrentConsumers() { 060 return defaultConcurrentConsumers; 061 } 062 063 public BlockingQueueFactory<Exchange> getDefaultQueueFactory() { 064 return defaultQueueFactory; 065 } 066 067 public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) { 068 this.defaultQueueFactory = defaultQueueFactory; 069 } 070 071 /** 072 * @deprecated use 073 */ 074 @Deprecated 075 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) { 076 return getOrCreateQueue(endpoint, size, null); 077 } 078 079 /** 080 * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)} 081 */ 082 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) { 083 return getOrCreateQueue(endpoint, size, multipleConsumers, null); 084 } 085 086 public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) { 087 String key = getQueueKey(endpoint.getEndpointUri()); 088 089 QueueReference ref = getQueues().get(key); 090 if (ref != null) { 091 092 // if the given size is not provided, we just use the existing queue as is 093 if (size != null && !size.equals(ref.getSize())) { 094 // there is already a queue, so make sure the size matches 095 throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size " 096 + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size); 097 } 098 // add the reference before returning queue 099 ref.addReference(endpoint); 100 101 if (log.isDebugEnabled()) { 102 log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()}); 103 } 104 return ref; 105 } 106 107 // create queue 108 BlockingQueue<Exchange> queue; 109 BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory; 110 if (size != null && size > 0) { 111 queue = queueFactory.create(size); 112 } else { 113 if (getQueueSize() > 0) { 114 size = getQueueSize(); 115 queue = queueFactory.create(getQueueSize()); 116 } else { 117 queue = queueFactory.create(); 118 } 119 } 120 log.debug("Created queue {} with size {}", key, size); 121 122 // create and add a new reference queue 123 ref = new QueueReference(queue, size, multipleConsumers); 124 ref.addReference(endpoint); 125 getQueues().put(key, ref); 126 127 return ref; 128 } 129 130 public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) { 131 String key = getQueueKey(endpoint.getEndpointUri()); 132 133 QueueReference ref = getQueues().get(key); 134 if (ref == null) { 135 ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers()); 136 ref.addReference(endpoint); 137 getQueues().put(key, ref); 138 } 139 140 return ref; 141 } 142 143 public Map<String, QueueReference> getQueues() { 144 return queues; 145 } 146 147 public QueueReference getQueueReference(String key) { 148 return queues.get(key); 149 } 150 151 @Override 152 @SuppressWarnings("unchecked") 153 protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { 154 int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); 155 boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); 156 if (limitConcurrentConsumers && consumers > maxConcurrentConsumers) { 157 throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " 158 + maxConcurrentConsumers + " was " + consumers); 159 } 160 // Resolve queue reference 161 BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class); 162 SedaEndpoint answer; 163 // Resolve queue factory when no queue specified 164 if (queue == null) { 165 BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class); 166 // defer creating queue till endpoint is started, so we pass the queue factory 167 answer = new SedaEndpoint(uri, this, queueFactory, consumers); 168 } else { 169 answer = new SedaEndpoint(uri, this, queue, consumers); 170 } 171 answer.configureProperties(parameters); 172 return answer; 173 } 174 175 public String getQueueKey(String uri) { 176 if (uri.contains("?")) { 177 // strip parameters 178 uri = uri.substring(0, uri.indexOf('?')); 179 } 180 return uri; 181 } 182 183 @Override 184 protected void doStop() throws Exception { 185 getQueues().clear(); 186 super.doStop(); 187 } 188 189 /** 190 * On shutting down the endpoint 191 * 192 * @param endpoint the endpoint 193 */ 194 void onShutdownEndpoint(SedaEndpoint endpoint) { 195 // we need to remove the endpoint from the reference counter 196 String key = getQueueKey(endpoint.getEndpointUri()); 197 QueueReference ref = getQueues().get(key); 198 if (ref != null && endpoint.getConsumers().size() == 0) { 199 // only remove the endpoint when the consumers are removed 200 ref.removeReference(endpoint); 201 if (ref.getCount() <= 0) { 202 // reference no longer needed so remove from queues 203 getQueues().remove(key); 204 } 205 } 206 } 207 208 }