001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util.concurrent;
018    
019    import java.util.Collection;
020    import java.util.List;
021    import java.util.concurrent.Callable;
022    import java.util.concurrent.ExecutionException;
023    import java.util.concurrent.Future;
024    import java.util.concurrent.RejectedExecutionException;
025    import java.util.concurrent.RejectedExecutionHandler;
026    import java.util.concurrent.ScheduledExecutorService;
027    import java.util.concurrent.ScheduledFuture;
028    import java.util.concurrent.ScheduledThreadPoolExecutor;
029    import java.util.concurrent.ThreadFactory;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.TimeoutException;
032    
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * A sized {@link ScheduledExecutorService} which will reject executing tasks if the task queue is full.
038     * <p/>
039     * The {@link ScheduledThreadPoolExecutor} which is the default implementation of the {@link ScheduledExecutorService}
040     * has unbounded task queue, which mean you can keep scheduling tasks which may cause the system to run out of memory.
041     * <p/>
042     * This class is a wrapped for {@link ScheduledThreadPoolExecutor} to reject executing tasks if an upper limit
043     * of the task queue has been reached.
044     */
045    public class SizedScheduledExecutorService implements ScheduledExecutorService {
046        
047        private static final Logger LOG = LoggerFactory.getLogger(SizedScheduledExecutorService.class); 
048        private final ScheduledThreadPoolExecutor delegate;
049        private final long queueSize;
050    
051        /**
052         * Creates a new sized {@link ScheduledExecutorService} with the given queue size as upper task limit.
053         *
054         * @param delegate   the delegate of the actual thread pool implementation
055         * @param queueSize  the upper queue size, use 0 or negative value for unlimited
056         */
057        public SizedScheduledExecutorService(ScheduledThreadPoolExecutor delegate, long queueSize) {
058            this.delegate = delegate;
059            this.queueSize = queueSize;
060        }
061    
062        /**
063         * Gets the wrapped {@link ScheduledThreadPoolExecutor}
064         */
065        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
066            return delegate;
067        }
068    
069        @Override
070        public <V> ScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit timeUnit) {
071            if (canScheduleOrExecute()) {
072                return delegate.schedule(task, delay, timeUnit);
073            } else {
074                throw new RejectedExecutionException("Task rejected due queue size limit reached");
075            }
076        }
077    
078        @Override
079        public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit timeUnit) {
080            if (canScheduleOrExecute()) {
081                return delegate.schedule(task, delay, timeUnit);
082            } else {
083                throw new RejectedExecutionException("Task rejected due queue size limit reached");
084            }
085        }
086    
087        @Override
088        public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
089            if (canScheduleOrExecute()) {
090                return delegate.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
091            } else {
092                throw new RejectedExecutionException("Task rejected due queue size limit reached");
093            }
094        }
095    
096        @Override
097        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
098            if (canScheduleOrExecute()) {
099                return delegate.scheduleWithFixedDelay(task, initialDelay, period, timeUnit);
100            } else {
101                throw new RejectedExecutionException("Task rejected due queue size limit reached");
102            }
103        }
104    
105        @Override
106        public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException {
107            return delegate.awaitTermination(timeout, timeUnit);
108        }
109    
110        public int getActiveCount() {
111            return delegate.getActiveCount();
112        }
113    
114        public long getCompletedTaskCount() {
115            return delegate.getCompletedTaskCount();
116        }
117    
118        public int getCorePoolSize() {
119            return delegate.getCorePoolSize();
120        }
121    
122        public long getKeepAliveTime(TimeUnit timeUnit) {
123            return delegate.getKeepAliveTime(timeUnit);
124        }
125    
126        public int getLargestPoolSize() {
127            return delegate.getLargestPoolSize();
128        }
129    
130        public int getMaximumPoolSize() {
131            return delegate.getMaximumPoolSize();
132        }
133    
134        public int getPoolSize() {
135            return delegate.getPoolSize();
136        }
137    
138        public RejectedExecutionHandler getRejectedExecutionHandler() {
139            return delegate.getRejectedExecutionHandler();
140        }
141    
142        public long getTaskCount() {
143            return delegate.getTaskCount();
144        }
145    
146        public ThreadFactory getThreadFactory() {
147            return delegate.getThreadFactory();
148        }
149    
150        @Override
151        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
152            if (canScheduleOrExecute()) {
153                return delegate.invokeAll(tasks);
154            } else {
155                throw new RejectedExecutionException("Task rejected due queue size limit reached");
156            }
157        }
158    
159        @Override
160        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException {
161            if (canScheduleOrExecute()) {
162                return delegate.invokeAll(tasks, timeout, timeUnit);
163            } else {
164                throw new RejectedExecutionException("Task rejected due queue size limit reached");
165            }
166        }
167    
168        @Override
169        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
170            if (canScheduleOrExecute()) {
171                return delegate.invokeAny(tasks);
172            } else {
173                throw new RejectedExecutionException("Task rejected due queue size limit reached");
174            }
175        }
176    
177        @Override
178        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
179            if (canScheduleOrExecute()) {
180                return delegate.invokeAny(tasks, timeout, timeUnit);
181            } else {
182                throw new RejectedExecutionException("Task rejected due queue size limit reached");
183            }
184        }
185    
186        @Override
187        public boolean isShutdown() {
188            return delegate.isShutdown();
189        }
190    
191        @Override
192        public boolean isTerminated() {
193            return delegate.isTerminated();
194        }
195    
196        public boolean isTerminating() {
197            return delegate.isTerminating();
198        }
199    
200        public int prestartAllCoreThreads() {
201            return delegate.prestartAllCoreThreads();
202        }
203    
204        public boolean prestartCoreThread() {
205            return delegate.prestartCoreThread();
206        }
207    
208        public void purge() {
209            delegate.purge();
210        }
211    
212        public void setCorePoolSize(int corePoolSize) {
213            delegate.setCorePoolSize(corePoolSize);
214        }
215    
216        public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
217            delegate.setKeepAliveTime(keepAliveTime, timeUnit);
218        }
219    
220        public void setMaximumPoolSize(int maximumPoolSize) {
221            delegate.setMaximumPoolSize(maximumPoolSize);
222        }
223    
224        public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
225            delegate.setRejectedExecutionHandler(rejectedExecutionHandler);
226        }
227    
228        public void setThreadFactory(ThreadFactory threadFactory) {
229            delegate.setThreadFactory(threadFactory);
230        }
231    
232        @Override
233        public void shutdown() {
234            delegate.shutdown();
235        }
236    
237        @Override
238        public List<Runnable> shutdownNow() {
239            return delegate.shutdownNow();
240        }
241    
242        @Override
243        public <T> Future<T> submit(Callable<T> task) {
244            if (canScheduleOrExecute()) {
245                return delegate.submit(task);
246            } else {
247                throw new RejectedExecutionException("Task rejected due queue size limit reached");
248            }
249        }
250    
251        @Override
252        public Future<?> submit(Runnable task) {
253            if (canScheduleOrExecute()) {
254                return delegate.submit(task);
255            } else {
256                throw new RejectedExecutionException("Task rejected due queue size limit reached");
257            }
258        }
259    
260        @Override
261        public <T> Future<T> submit(Runnable task, T result) {
262            if (canScheduleOrExecute()) {
263                return delegate.submit(task, result);
264            } else {
265                throw new RejectedExecutionException("Task rejected due queue size limit reached");
266            }
267        }
268    
269        @Override
270        public void execute(Runnable task) {
271            if (canScheduleOrExecute()) {
272                delegate.execute(task);
273            } else {
274                throw new RejectedExecutionException("Task rejected due queue size limit reached");
275            }
276        }
277    
278        public void allowCoreThreadTimeOut(boolean value) {
279            delegate.allowCoreThreadTimeOut(value);
280        }
281    
282        public boolean allowsCoreThreadTimeOut() {
283            return delegate.allowsCoreThreadTimeOut();
284        }
285    
286        /**
287         * Can the task be scheduled or executed?
288         *
289         * @return <tt>true</tt> to accept, <tt>false</tt> to not accept
290         */
291        protected boolean canScheduleOrExecute() {
292            if (queueSize <= 0) {
293                return true;
294            }
295    
296            int size = delegate.getQueue().size();
297            boolean answer = size < queueSize;
298            if (LOG.isTraceEnabled()) {
299                LOG.trace("canScheduleOrExecute {} < {} -> {}", new Object[]{size, queueSize, answer});
300            }
301            return answer;
302        }
303    
304        @Override
305        public String toString() {
306            // the thread factory often have more precise details what the thread pool is used for
307            if (delegate.getThreadFactory() instanceof CamelThreadFactory) {
308                String name = ((CamelThreadFactory) delegate.getThreadFactory()).getName();
309                return super.toString() + "[" + name + "]";
310            } else {
311                return super.toString();
312            }
313        }
314    }