001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.BlockingQueue;
022    
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.impl.UriEndpointComponent;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a>
031     * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
032     *
033     * @version 
034     */
035    public class SedaComponent extends UriEndpointComponent {
036        protected final Logger log = LoggerFactory.getLogger(getClass());
037        protected final int maxConcurrentConsumers = 500;
038        protected int queueSize;
039        protected int defaultConcurrentConsumers = 1;
040        private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
041        private BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<Exchange>();
042    
043        public SedaComponent() {
044            super(SedaEndpoint.class);
045        }
046    
047        public void setQueueSize(int size) {
048            queueSize = size;
049        }
050        
051        public int getQueueSize() {
052            return queueSize;
053        }
054        
055        public void setConcurrentConsumers(int size) {
056            defaultConcurrentConsumers = size;
057        }
058        
059        public int getConcurrentConsumers() {
060            return defaultConcurrentConsumers;
061        }
062    
063        public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
064            return defaultQueueFactory;
065        }
066    
067        public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
068            this.defaultQueueFactory = defaultQueueFactory;
069        }
070    
071        /**
072         * @deprecated use
073         */
074        @Deprecated
075        public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size) {
076            return getOrCreateQueue(endpoint, size, null);
077        }
078    
079        /**
080         * @deprecated use {@link #getOrCreateQueue(SedaEndpoint, Integer, Boolean, BlockingQueueFactory)}
081         */
082        public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers) {
083            return getOrCreateQueue(endpoint, size, multipleConsumers, null);
084        }
085    
086        public synchronized QueueReference getOrCreateQueue(SedaEndpoint endpoint, Integer size, Boolean multipleConsumers, BlockingQueueFactory<Exchange> customQueueFactory) {
087            String key = getQueueKey(endpoint.getEndpointUri());
088    
089            QueueReference ref = getQueues().get(key);
090            if (ref != null) {
091    
092                // if the given size is not provided, we just use the existing queue as is
093                if (size != null && !size.equals(ref.getSize())) {
094                    // there is already a queue, so make sure the size matches
095                    throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
096                            + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size);
097                }
098                // add the reference before returning queue
099                ref.addReference(endpoint);
100    
101                if (log.isDebugEnabled()) {
102                    log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()});
103                }
104                return ref;
105            }
106    
107            // create queue
108            BlockingQueue<Exchange> queue;
109            BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory;
110            if (size != null && size > 0) {
111                queue = queueFactory.create(size);
112            } else {
113                if (getQueueSize() > 0) {
114                    size = getQueueSize();
115                    queue = queueFactory.create(getQueueSize());
116                } else {
117                    queue = queueFactory.create();
118                }
119            }
120            log.debug("Created queue {} with size {}", key, size);
121    
122            // create and add a new reference queue
123            ref = new QueueReference(queue, size, multipleConsumers);
124            ref.addReference(endpoint);
125            getQueues().put(key, ref);
126    
127            return ref;
128        }
129    
130        public synchronized QueueReference registerQueue(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) {
131            String key = getQueueKey(endpoint.getEndpointUri());
132    
133            QueueReference ref = getQueues().get(key);
134            if (ref == null) {
135                ref = new QueueReference(queue, endpoint.getSize(), endpoint.isMultipleConsumers());
136                ref.addReference(endpoint);
137                getQueues().put(key, ref);
138            }
139    
140            return ref;
141        }
142    
143        public Map<String, QueueReference> getQueues() {
144            return queues;
145        }
146    
147        public QueueReference getQueueReference(String key) {
148            return queues.get(key);
149        }
150    
151        @Override
152        @SuppressWarnings("unchecked")
153        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
154            int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
155            boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
156            if (limitConcurrentConsumers && consumers >  maxConcurrentConsumers) {
157                throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
158                        + maxConcurrentConsumers + " was " + consumers);
159            }
160            // Resolve queue reference
161            BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
162            SedaEndpoint answer;
163            // Resolve queue factory when no queue specified
164            if (queue == null) {
165                BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
166                // defer creating queue till endpoint is started, so we pass the queue factory
167                answer = new SedaEndpoint(uri, this, queueFactory, consumers);
168            } else {
169                answer = new SedaEndpoint(uri, this, queue, consumers);
170            }
171            answer.configureProperties(parameters);
172            return answer;
173        }
174    
175        public String getQueueKey(String uri) {
176            if (uri.contains("?")) {
177                // strip parameters
178                uri = uri.substring(0, uri.indexOf('?'));
179            }
180            return uri;
181        }
182    
183        @Override
184        protected void doStop() throws Exception {
185            getQueues().clear();
186            super.doStop();
187        }
188    
189        /**
190         * On shutting down the endpoint
191         * 
192         * @param endpoint the endpoint
193         */
194        void onShutdownEndpoint(SedaEndpoint endpoint) {
195            // we need to remove the endpoint from the reference counter
196            String key = getQueueKey(endpoint.getEndpointUri());
197            QueueReference ref = getQueues().get(key);
198            if (ref != null && endpoint.getConsumers().size() == 0) {
199                // only remove the endpoint when the consumers are removed
200                ref.removeReference(endpoint);
201                if (ref.getCount() <= 0) {
202                    // reference no longer needed so remove from queues
203                    getQueues().remove(key);
204                }
205            }
206        }
207    
208    }