001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util.concurrent; 018 019 import java.util.Collection; 020 import java.util.List; 021 import java.util.concurrent.Callable; 022 import java.util.concurrent.ExecutionException; 023 import java.util.concurrent.Future; 024 import java.util.concurrent.RejectedExecutionException; 025 import java.util.concurrent.RejectedExecutionHandler; 026 import java.util.concurrent.ScheduledExecutorService; 027 import java.util.concurrent.ScheduledFuture; 028 import java.util.concurrent.ScheduledThreadPoolExecutor; 029 import java.util.concurrent.ThreadFactory; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.TimeoutException; 032 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 /** 037 * A sized {@link ScheduledExecutorService} which will reject executing tasks if the task queue is full. 038 * <p/> 039 * The {@link ScheduledThreadPoolExecutor} which is the default implementation of the {@link ScheduledExecutorService} 040 * has unbounded task queue, which mean you can keep scheduling tasks which may cause the system to run out of memory. 041 * <p/> 042 * This class is a wrapped for {@link ScheduledThreadPoolExecutor} to reject executing tasks if an upper limit 043 * of the task queue has been reached. 044 */ 045 public class SizedScheduledExecutorService implements ScheduledExecutorService { 046 047 private static final Logger LOG = LoggerFactory.getLogger(SizedScheduledExecutorService.class); 048 private final ScheduledThreadPoolExecutor delegate; 049 private final long queueSize; 050 051 /** 052 * Creates a new sized {@link ScheduledExecutorService} with the given queue size as upper task limit. 053 * 054 * @param delegate the delegate of the actual thread pool implementation 055 * @param queueSize the upper queue size, use 0 or negative value for unlimited 056 */ 057 public SizedScheduledExecutorService(ScheduledThreadPoolExecutor delegate, long queueSize) { 058 this.delegate = delegate; 059 this.queueSize = queueSize; 060 } 061 062 /** 063 * Gets the wrapped {@link ScheduledThreadPoolExecutor} 064 */ 065 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 066 return delegate; 067 } 068 069 @Override 070 public <V> ScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit timeUnit) { 071 if (canScheduleOrExecute()) { 072 return delegate.schedule(task, delay, timeUnit); 073 } else { 074 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 075 } 076 } 077 078 @Override 079 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit timeUnit) { 080 if (canScheduleOrExecute()) { 081 return delegate.schedule(task, delay, timeUnit); 082 } else { 083 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 084 } 085 } 086 087 @Override 088 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit timeUnit) { 089 if (canScheduleOrExecute()) { 090 return delegate.scheduleAtFixedRate(task, initialDelay, period, timeUnit); 091 } else { 092 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 093 } 094 } 095 096 @Override 097 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long period, TimeUnit timeUnit) { 098 if (canScheduleOrExecute()) { 099 return delegate.scheduleWithFixedDelay(task, initialDelay, period, timeUnit); 100 } else { 101 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 102 } 103 } 104 105 @Override 106 public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { 107 return delegate.awaitTermination(timeout, timeUnit); 108 } 109 110 public int getActiveCount() { 111 return delegate.getActiveCount(); 112 } 113 114 public long getCompletedTaskCount() { 115 return delegate.getCompletedTaskCount(); 116 } 117 118 public int getCorePoolSize() { 119 return delegate.getCorePoolSize(); 120 } 121 122 public long getKeepAliveTime(TimeUnit timeUnit) { 123 return delegate.getKeepAliveTime(timeUnit); 124 } 125 126 public int getLargestPoolSize() { 127 return delegate.getLargestPoolSize(); 128 } 129 130 public int getMaximumPoolSize() { 131 return delegate.getMaximumPoolSize(); 132 } 133 134 public int getPoolSize() { 135 return delegate.getPoolSize(); 136 } 137 138 public RejectedExecutionHandler getRejectedExecutionHandler() { 139 return delegate.getRejectedExecutionHandler(); 140 } 141 142 public long getTaskCount() { 143 return delegate.getTaskCount(); 144 } 145 146 public ThreadFactory getThreadFactory() { 147 return delegate.getThreadFactory(); 148 } 149 150 @Override 151 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { 152 if (canScheduleOrExecute()) { 153 return delegate.invokeAll(tasks); 154 } else { 155 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 156 } 157 } 158 159 @Override 160 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException { 161 if (canScheduleOrExecute()) { 162 return delegate.invokeAll(tasks, timeout, timeUnit); 163 } else { 164 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 165 } 166 } 167 168 @Override 169 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { 170 if (canScheduleOrExecute()) { 171 return delegate.invokeAny(tasks); 172 } else { 173 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 174 } 175 } 176 177 @Override 178 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { 179 if (canScheduleOrExecute()) { 180 return delegate.invokeAny(tasks, timeout, timeUnit); 181 } else { 182 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 183 } 184 } 185 186 @Override 187 public boolean isShutdown() { 188 return delegate.isShutdown(); 189 } 190 191 @Override 192 public boolean isTerminated() { 193 return delegate.isTerminated(); 194 } 195 196 public boolean isTerminating() { 197 return delegate.isTerminating(); 198 } 199 200 public int prestartAllCoreThreads() { 201 return delegate.prestartAllCoreThreads(); 202 } 203 204 public boolean prestartCoreThread() { 205 return delegate.prestartCoreThread(); 206 } 207 208 public void purge() { 209 delegate.purge(); 210 } 211 212 public void setCorePoolSize(int corePoolSize) { 213 delegate.setCorePoolSize(corePoolSize); 214 } 215 216 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) { 217 delegate.setKeepAliveTime(keepAliveTime, timeUnit); 218 } 219 220 public void setMaximumPoolSize(int maximumPoolSize) { 221 delegate.setMaximumPoolSize(maximumPoolSize); 222 } 223 224 public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { 225 delegate.setRejectedExecutionHandler(rejectedExecutionHandler); 226 } 227 228 public void setThreadFactory(ThreadFactory threadFactory) { 229 delegate.setThreadFactory(threadFactory); 230 } 231 232 @Override 233 public void shutdown() { 234 delegate.shutdown(); 235 } 236 237 @Override 238 public List<Runnable> shutdownNow() { 239 return delegate.shutdownNow(); 240 } 241 242 @Override 243 public <T> Future<T> submit(Callable<T> task) { 244 if (canScheduleOrExecute()) { 245 return delegate.submit(task); 246 } else { 247 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 248 } 249 } 250 251 @Override 252 public Future<?> submit(Runnable task) { 253 if (canScheduleOrExecute()) { 254 return delegate.submit(task); 255 } else { 256 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 257 } 258 } 259 260 @Override 261 public <T> Future<T> submit(Runnable task, T result) { 262 if (canScheduleOrExecute()) { 263 return delegate.submit(task, result); 264 } else { 265 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 266 } 267 } 268 269 @Override 270 public void execute(Runnable task) { 271 if (canScheduleOrExecute()) { 272 delegate.execute(task); 273 } else { 274 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 275 } 276 } 277 278 public void allowCoreThreadTimeOut(boolean value) { 279 delegate.allowCoreThreadTimeOut(value); 280 } 281 282 public boolean allowsCoreThreadTimeOut() { 283 return delegate.allowsCoreThreadTimeOut(); 284 } 285 286 /** 287 * Can the task be scheduled or executed? 288 * 289 * @return <tt>true</tt> to accept, <tt>false</tt> to not accept 290 */ 291 protected boolean canScheduleOrExecute() { 292 if (queueSize <= 0) { 293 return true; 294 } 295 296 int size = delegate.getQueue().size(); 297 boolean answer = size < queueSize; 298 if (LOG.isTraceEnabled()) { 299 LOG.trace("canScheduleOrExecute {} < {} -> {}", new Object[]{size, queueSize, answer}); 300 } 301 return answer; 302 } 303 304 @Override 305 public String toString() { 306 // the thread factory often have more precise details what the thread pool is used for 307 if (delegate.getThreadFactory() instanceof CamelThreadFactory) { 308 String name = ((CamelThreadFactory) delegate.getThreadFactory()).getName(); 309 return super.toString() + "[" + name + "]"; 310 } else { 311 return super.toString(); 312 } 313 } 314 }