001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.concurrent.BlockingQueue; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Executors; 022 import java.util.concurrent.LinkedBlockingQueue; 023 import java.util.concurrent.RejectedExecutionHandler; 024 import java.util.concurrent.ScheduledExecutorService; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.SynchronousQueue; 027 import java.util.concurrent.ThreadFactory; 028 import java.util.concurrent.ThreadPoolExecutor; 029 import java.util.concurrent.TimeUnit; 030 031 import org.apache.camel.spi.ThreadPoolFactory; 032 import org.apache.camel.spi.ThreadPoolProfile; 033 import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; 034 import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; 035 import org.apache.camel.util.concurrent.SizedScheduledExecutorService; 036 037 /** 038 * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools. 039 */ 040 public class DefaultThreadPoolFactory implements ThreadPoolFactory { 041 042 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 043 return Executors.newCachedThreadPool(threadFactory); 044 } 045 046 @Override 047 public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { 048 return newThreadPool(profile.getPoolSize(), 049 profile.getMaxPoolSize(), 050 profile.getKeepAliveTime(), 051 profile.getTimeUnit(), 052 profile.getMaxQueueSize(), 053 profile.getRejectedExecutionHandler(), 054 factory); 055 } 056 057 public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, 058 int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler, 059 ThreadFactory threadFactory) throws IllegalArgumentException { 060 061 // the core pool size must be higher than 0 062 if (corePoolSize < 1) { 063 throw new IllegalArgumentException("CorePoolSize must be >= 1, was " + corePoolSize); 064 } 065 066 // validate max >= core 067 if (maxPoolSize < corePoolSize) { 068 throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); 069 } 070 071 BlockingQueue<Runnable> workQueue; 072 if (corePoolSize == 0 && maxQueueSize <= 0) { 073 // use a synchronous queue for direct-handover (no tasks stored on the queue) 074 workQueue = new SynchronousQueue<Runnable>(); 075 // and force 1 as pool size to be able to create the thread pool by the JDK 076 corePoolSize = 1; 077 maxPoolSize = 1; 078 } else if (maxQueueSize <= 0) { 079 // use a synchronous queue for direct-handover (no tasks stored on the queue) 080 workQueue = new SynchronousQueue<Runnable>(); 081 } else { 082 // bounded task queue to store tasks on the queue 083 workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize); 084 } 085 086 ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); 087 answer.setThreadFactory(threadFactory); 088 if (rejectedExecutionHandler == null) { 089 rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 090 } 091 answer.setRejectedExecutionHandler(rejectedExecutionHandler); 092 return answer; 093 } 094 095 @Override 096 public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { 097 RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); 098 if (rejectedExecutionHandler == null) { 099 rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 100 } 101 102 ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); 103 // TODO: when JDK7 we should setRemoveOnCancelPolicy(true) 104 105 // need to wrap the thread pool in a sized to guard against the problem that the 106 // JDK created thread pool has an unbounded queue (see class javadoc), which mean 107 // we could potentially keep adding tasks, and run out of memory. 108 if (profile.getMaxPoolSize() > 0) { 109 return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); 110 } else { 111 return answer; 112 } 113 } 114 115 }