001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.BlockingQueue;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    import java.util.concurrent.LinkedBlockingQueue;
023    import java.util.concurrent.RejectedExecutionHandler;
024    import java.util.concurrent.ScheduledExecutorService;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.SynchronousQueue;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.ThreadPoolExecutor;
029    import java.util.concurrent.TimeUnit;
030    
031    import org.apache.camel.spi.ThreadPoolFactory;
032    import org.apache.camel.spi.ThreadPoolProfile;
033    import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
034    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
035    import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
036    
037    /**
038     * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
039     */
040    public class DefaultThreadPoolFactory implements ThreadPoolFactory {
041    
042        public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
043            return Executors.newCachedThreadPool(threadFactory);
044        }
045        
046        @Override
047        public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
048            return newThreadPool(profile.getPoolSize(), 
049                                 profile.getMaxPoolSize(), 
050                                 profile.getKeepAliveTime(),
051                                 profile.getTimeUnit(),
052                                 profile.getMaxQueueSize(), 
053                                 profile.getRejectedExecutionHandler(),
054                                 factory);
055        }
056    
057        public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
058                                             int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
059                                             ThreadFactory threadFactory) throws IllegalArgumentException {
060    
061            // the core pool size must be higher than 0
062            if (corePoolSize < 1) {
063                throw new IllegalArgumentException("CorePoolSize must be >= 1, was " + corePoolSize);
064            }
065    
066            // validate max >= core
067            if (maxPoolSize < corePoolSize) {
068                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
069            }
070    
071            BlockingQueue<Runnable> workQueue;
072            if (corePoolSize == 0 && maxQueueSize <= 0) {
073                // use a synchronous queue for direct-handover (no tasks stored on the queue)
074                workQueue = new SynchronousQueue<Runnable>();
075                // and force 1 as pool size to be able to create the thread pool by the JDK
076                corePoolSize = 1;
077                maxPoolSize = 1;
078            } else if (maxQueueSize <= 0) {
079                // use a synchronous queue for direct-handover (no tasks stored on the queue)
080                workQueue = new SynchronousQueue<Runnable>();
081            } else {
082                // bounded task queue to store tasks on the queue
083                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
084            }
085    
086            ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
087            answer.setThreadFactory(threadFactory);
088            if (rejectedExecutionHandler == null) {
089                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
090            }
091            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
092            return answer;
093        }
094        
095        @Override
096        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
097            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
098            if (rejectedExecutionHandler == null) {
099                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
100            }
101    
102            ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
103            // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
104    
105            // need to wrap the thread pool in a sized to guard against the problem that the
106            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
107            // we could potentially keep adding tasks, and run out of memory.
108            if (profile.getMaxPoolSize() > 0) {
109                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
110            } else {
111                return answer;
112            }
113        }
114    
115    }