001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.LinkedList;
020    import java.util.List;
021    import java.util.concurrent.BlockingQueue;
022    
023    import org.apache.camel.Exchange;
024    
025    /**
026     * Holder for queue references.
027     * <p/>
028     * This is used to keep track of the usages of the queues, so we know when a queue is no longer
029     * in use, and can safely be discarded.
030     */
031    public final class QueueReference {
032    
033        private final BlockingQueue<Exchange> queue;
034        private Integer size;
035        private Boolean multipleConsumers;
036    
037        private List<SedaEndpoint> endpoints = new LinkedList<SedaEndpoint>();
038    
039        QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) {
040            this.queue = queue;
041            this.size = size;
042            this.multipleConsumers = multipleConsumers;
043        }
044    
045        synchronized void addReference(SedaEndpoint endpoint) {
046            if (!endpoints.contains(endpoint)) {
047                endpoints.add(endpoint);
048                // update the multipleConsumers setting if need
049                if (endpoint.isMultipleConsumers()) {
050                    multipleConsumers = true;
051                }
052            }
053        }
054    
055        synchronized void removeReference(SedaEndpoint endpoint) {
056            if (endpoints.contains(endpoint)) {
057                endpoints.remove(endpoint);
058            }
059        }
060    
061        /**
062         * Gets the reference counter
063         */
064        public synchronized int getCount() {
065            return endpoints.size();
066        }
067    
068        /**
069         * Gets the queue size
070         *
071         * @return <tt>null</tt> if unbounded
072         */
073        public Integer getSize() {
074            return size;
075        }
076    
077        public Boolean getMultipleConsumers() {
078            return multipleConsumers;
079        }
080    
081        /**
082         * Gets the queue
083         */
084        public BlockingQueue<Exchange> getQueue() {
085            return queue;
086        }
087    
088        public synchronized boolean hasConsumers() {
089            for (SedaEndpoint endpoint : endpoints) {
090                if (endpoint.getConsumers().size() > 0) {
091                    return true;
092                }
093            }
094    
095            return false;
096        }
097    }