001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.concurrent.ArrayBlockingQueue;
022    import java.util.concurrent.BlockingQueue;
023    import java.util.concurrent.ConcurrentHashMap;
024    import java.util.concurrent.ConcurrentMap;
025    
026    import org.apache.camel.spi.ServicePool;
027    import org.apache.camel.support.ServiceSupport;
028    import org.apache.camel.util.ServiceHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Default implementation to inherit for a basic service pool.
034     *
035     * @version 
036     */
037    public abstract class DefaultServicePool<Key, Service> extends ServiceSupport implements ServicePool<Key, Service> {
038        protected final Logger log = LoggerFactory.getLogger(getClass());
039        protected final ConcurrentMap<Key, BlockingQueue<Service>> pool = new ConcurrentHashMap<Key, BlockingQueue<Service>>();
040        protected int capacity = 100;
041    
042        protected DefaultServicePool() {
043        }
044    
045        public DefaultServicePool(int capacity) {
046            this.capacity = capacity;
047        }
048    
049        public int getCapacity() {
050            return capacity;
051        }
052    
053        public void setCapacity(int capacity) {
054            this.capacity = capacity;
055        }
056    
057        public synchronized int size() {
058            int size = 0;
059            for (BlockingQueue<Service> entry : pool.values()) {
060                size += entry.size();
061            }
062            return size;
063        }
064    
065        public synchronized Service addAndAcquire(Key key, Service service) {
066            BlockingQueue<Service> entry = pool.get(key);
067            if (entry == null) {
068                entry = new ArrayBlockingQueue<Service>(capacity);
069                pool.put(key, entry);
070            }
071            log.trace("AddAndAcquire key: {} service: {}", key, service);
072    
073            // test if queue will be full
074            if (entry.size() >= capacity) {
075                throw new IllegalStateException("Queue full");
076            }
077            return service;
078        }
079    
080        public synchronized Service acquire(Key key) {
081            BlockingQueue<Service> services = pool.get(key);
082            if (services == null || services.isEmpty()) {
083                log.trace("No free services in pool to acquire for key: {}", key);
084                return null;
085            }
086    
087            Service answer = services.poll();
088            log.trace("Acquire: {} service: {}", key, answer);
089            return answer;
090        }
091    
092        public synchronized void release(Key key, Service service) {
093            log.trace("Release: {} service: {}", key, service);
094            BlockingQueue<Service> services = pool.get(key);
095            if (services != null) {
096                services.add(service);
097            }
098        }
099    
100        public void purge() {
101            pool.clear();
102        }
103    
104        protected void doStart() throws Exception {
105            log.debug("Starting service pool: {}", this);
106        }
107    
108        protected void doStop() throws Exception {
109            log.debug("Stopping service pool: {}", this);
110            for (BlockingQueue<Service> entry : pool.values()) {
111                Collection<Service> values = new ArrayList<Service>();
112                entry.drainTo(values);
113                ServiceHelper.stopServices(values);
114                entry.clear();
115            }
116            pool.clear();
117        }
118    
119    }