001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Iterator;
020    import java.util.LinkedHashSet;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.CopyOnWriteArrayList;
026    import java.util.concurrent.ExecutorService;
027    import java.util.concurrent.ScheduledExecutorService;
028    import java.util.concurrent.ThreadFactory;
029    import java.util.concurrent.ThreadPoolExecutor;
030    import java.util.concurrent.TimeUnit;
031    
032    import org.apache.camel.CamelContext;
033    import org.apache.camel.NamedNode;
034    import org.apache.camel.StaticService;
035    import org.apache.camel.ThreadPoolRejectedPolicy;
036    import org.apache.camel.model.OptionalIdentifiedDefinition;
037    import org.apache.camel.model.ProcessorDefinition;
038    import org.apache.camel.model.ProcessorDefinitionHelper;
039    import org.apache.camel.model.RouteDefinition;
040    import org.apache.camel.spi.ExecutorServiceManager;
041    import org.apache.camel.spi.LifecycleStrategy;
042    import org.apache.camel.spi.ThreadPoolFactory;
043    import org.apache.camel.spi.ThreadPoolProfile;
044    import org.apache.camel.support.ServiceSupport;
045    import org.apache.camel.util.ObjectHelper;
046    import org.apache.camel.util.StopWatch;
047    import org.apache.camel.util.TimeUtils;
048    import org.apache.camel.util.URISupport;
049    import org.apache.camel.util.concurrent.CamelThreadFactory;
050    import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
051    import org.apache.camel.util.concurrent.ThreadHelper;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * @version 
057     */
058    public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
059        private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class);
060    
061        private final CamelContext camelContext;
062        private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
063        private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<ExecutorService>();
064        private String threadNamePattern;
065        private long shutdownAwaitTermination = 10000;
066        private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
067        private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<String, ThreadPoolProfile>();
068        private ThreadPoolProfile defaultProfile;
069    
070        public DefaultExecutorServiceManager(CamelContext camelContext) {
071            this.camelContext = camelContext;
072    
073            defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
074            defaultProfile.setDefaultProfile(true);
075            defaultProfile.setPoolSize(10);
076            defaultProfile.setMaxPoolSize(20);
077            defaultProfile.setKeepAliveTime(60L);
078            defaultProfile.setTimeUnit(TimeUnit.SECONDS);
079            defaultProfile.setMaxQueueSize(1000);
080            defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
081    
082            registerThreadPoolProfile(defaultProfile);
083        }
084    
085        @Override
086        public ThreadPoolFactory getThreadPoolFactory() {
087            return threadPoolFactory;
088        }
089    
090        @Override
091        public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
092            this.threadPoolFactory = threadPoolFactory;
093        }
094    
095        @Override
096        public void registerThreadPoolProfile(ThreadPoolProfile profile) {
097            ObjectHelper.notNull(profile, "profile");
098            ObjectHelper.notEmpty(profile.getId(), "id", profile);
099            threadPoolProfiles.put(profile.getId(), profile);
100        }
101    
102        @Override
103        public ThreadPoolProfile getThreadPoolProfile(String id) {
104            return threadPoolProfiles.get(id);
105        }
106    
107        @Override
108        public ThreadPoolProfile getDefaultThreadPoolProfile() {
109            return getThreadPoolProfile(defaultThreadPoolProfileId);
110        }
111    
112        @Override
113        public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
114            threadPoolProfiles.remove(defaultThreadPoolProfileId);
115            defaultThreadPoolProfile.addDefaults(defaultProfile);
116    
117            LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile);
118    
119            this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
120            defaultThreadPoolProfile.setDefaultProfile(true);
121            registerThreadPoolProfile(defaultThreadPoolProfile);
122        }
123    
124        @Override
125        public String getThreadNamePattern() {
126            return threadNamePattern;
127        }
128    
129        @Override
130        public void setThreadNamePattern(String threadNamePattern) {
131            // must set camel id here in the pattern and let the other placeholders be resolved on demand
132            String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
133            this.threadNamePattern = name;
134        }
135    
136        @Override
137        public long getShutdownAwaitTermination() {
138            return shutdownAwaitTermination;
139        }
140    
141        @Override
142        public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
143            this.shutdownAwaitTermination = shutdownAwaitTermination;
144        }
145    
146        @Override
147        public String resolveThreadName(String name) {
148            return ThreadHelper.resolveThreadName(threadNamePattern, name);
149        }
150    
151        @Override
152        public Thread newThread(String name, Runnable runnable) {
153            ThreadFactory factory = createThreadFactory(name, true);
154            return factory.newThread(runnable);
155        }
156    
157        @Override
158        public ExecutorService newDefaultThreadPool(Object source, String name) {
159            return newThreadPool(source, name, getDefaultThreadPoolProfile());
160        }
161    
162        @Override
163        public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
164            return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
165        }
166    
167        @Override
168        public ExecutorService newThreadPool(Object source, String name, String profileId) {
169            ThreadPoolProfile profile = getThreadPoolProfile(profileId);
170            if (profile != null) {
171                return newThreadPool(source, name, profile);
172            } else {
173                // no profile with that id
174                return null;
175            }
176        }
177    
178        @Override
179        public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) {
180            String sanitizedName = URISupport.sanitizeUri(name);
181            ObjectHelper.notNull(profile, "ThreadPoolProfile");
182    
183            ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
184            profile.addDefaults(defaultProfile);
185    
186            ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
187            ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
188            onThreadPoolCreated(executorService, source, profile.getId());
189            if (LOG.isDebugEnabled()) {
190                LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService});
191            }
192    
193            return executorService;
194        }
195    
196        @Override
197        public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
198            ThreadPoolProfile profile = new ThreadPoolProfile(name);
199            profile.setPoolSize(poolSize);
200            profile.setMaxPoolSize(maxPoolSize);
201            return  newThreadPool(source, name, profile);
202        }
203    
204        @Override
205        public ExecutorService newSingleThreadExecutor(Object source, String name) {
206            return newFixedThreadPool(source, name, 1);
207        }
208    
209        @Override
210        public ExecutorService newCachedThreadPool(Object source, String name) {
211            String sanitizedName = URISupport.sanitizeUri(name);
212            ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
213            onThreadPoolCreated(answer, source, null);
214    
215            if (LOG.isDebugEnabled()) {
216                LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
217            }
218            return answer;
219        }
220    
221        @Override
222        public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
223            ThreadPoolProfile profile = new ThreadPoolProfile(name);
224            profile.setPoolSize(poolSize);
225            profile.setMaxPoolSize(poolSize);
226            profile.setKeepAliveTime(0L);
227            return newThreadPool(source, name, profile);
228        }
229    
230        @Override
231        public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
232            return newScheduledThreadPool(source, name, 1);
233        }
234        
235        @Override
236        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
237            String sanitizedName = URISupport.sanitizeUri(name);
238            profile.addDefaults(getDefaultThreadPoolProfile());
239            ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true));
240            onThreadPoolCreated(answer, source, null);
241    
242            if (LOG.isDebugEnabled()) {
243                LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
244            }
245            return answer;
246        }
247    
248        @Override
249        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) {
250            ThreadPoolProfile profile = getThreadPoolProfile(profileId);
251            if (profile != null) {
252                return newScheduledThreadPool(source, name, profile);
253            } else {
254                // no profile with that id
255                return null;
256            }
257        }
258    
259        @Override
260        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
261            ThreadPoolProfile profile = new ThreadPoolProfile(name);
262            profile.setPoolSize(poolSize);
263            return newScheduledThreadPool(source, name, profile);
264        }
265    
266        @Override
267        public void shutdown(ExecutorService executorService) {
268            doShutdown(executorService, 0, false);
269        }
270    
271        @Override
272        public void shutdownGraceful(ExecutorService executorService) {
273            doShutdown(executorService, getShutdownAwaitTermination(), false);
274        }
275    
276        @Override
277        public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
278            doShutdown(executorService, shutdownAwaitTermination, false);
279        }
280    
281        private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) {
282            if (executorService == null) {
283                return false;
284            }
285    
286            boolean warned = false;
287    
288            // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
289            // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
290            // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
291            // we ought to shutdown much faster)
292            if (!executorService.isShutdown()) {
293                StopWatch watch = new StopWatch();
294    
295                LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
296                executorService.shutdown();
297    
298                if (shutdownAwaitTermination > 0) {
299                    try {
300                        if (!awaitTermination(executorService, shutdownAwaitTermination)) {
301                            warned = true;
302                            LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
303                            executorService.shutdownNow();
304                            // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
305                            if (!awaitTermination(executorService, shutdownAwaitTermination)) {
306                                LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
307                            }
308                        }
309                    } catch (InterruptedException e) {
310                        warned = true;
311                        LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
312                        // we were interrupted during shutdown, so force shutdown
313                        executorService.shutdownNow();
314                    }
315                }
316    
317                // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
318                if (warned) {
319                    LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
320                            new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
321                } else if (LOG.isDebugEnabled()) {
322                    LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
323                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
324                }
325            }
326    
327            // let lifecycle strategy be notified as well which can let it be managed in JMX as well
328            ThreadPoolExecutor threadPool = null;
329            if (executorService instanceof ThreadPoolExecutor) {
330                threadPool = (ThreadPoolExecutor) executorService;
331            } else if (executorService instanceof SizedScheduledExecutorService) {
332                threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
333            }
334            if (threadPool != null) {
335                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
336                    lifecycle.onThreadPoolRemove(camelContext, threadPool);
337                }
338            }
339    
340            // remove reference as its shutdown (do not remove if fail-safe)
341            if (!failSafe) {
342                executorServices.remove(executorService);
343            }
344    
345            return warned;
346        }
347    
348        @Override
349        public List<Runnable> shutdownNow(ExecutorService executorService) {
350            return doShutdownNow(executorService, false);
351        }
352    
353        private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) {
354            ObjectHelper.notNull(executorService, "executorService");
355    
356            List<Runnable> answer = null;
357            if (!executorService.isShutdown()) {
358                if (failSafe) {
359                    // log as warn, as we shutdown as fail-safe, so end user should see more details in the log.
360                    LOG.warn("Forcing shutdown of ExecutorService: {}", executorService);
361                } else {
362                    LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
363                }
364                answer = executorService.shutdownNow();
365                if (LOG.isTraceEnabled()) {
366                    LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
367                            new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
368                }
369            }
370    
371            // let lifecycle strategy be notified as well which can let it be managed in JMX as well
372            ThreadPoolExecutor threadPool = null;
373            if (executorService instanceof ThreadPoolExecutor) {
374                threadPool = (ThreadPoolExecutor) executorService;
375            } else if (executorService instanceof SizedScheduledExecutorService) {
376                threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
377            }
378            if (threadPool != null) {
379                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
380                    lifecycle.onThreadPoolRemove(camelContext, threadPool);
381                }
382            }
383    
384            // remove reference as its shutdown (do not remove if fail-safe)
385            if (!failSafe) {
386                executorServices.remove(executorService);
387            }
388    
389            return answer;
390        }
391    
392        @Override
393        public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
394            // log progress every 2nd second so end user is aware of we are shutting down
395            StopWatch watch = new StopWatch();
396            long interval = Math.min(2000, shutdownAwaitTermination);
397            boolean done = false;
398            while (!done && interval > 0) {
399                if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
400                    done = true;
401                } else {
402                    LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
403                    // recalculate interval
404                    interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
405                }
406            }
407    
408            return done;
409        }
410    
411        /**
412         * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
413         *
414         * @param executorService the created {@link java.util.concurrent.ExecutorService}
415         */
416        protected void onNewExecutorService(ExecutorService executorService) {
417            // noop
418        }
419    
420        @Override
421        protected void doStart() throws Exception {
422            if (threadNamePattern == null) {
423                // set default name pattern which includes the camel context name
424                threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#";
425            }
426        }
427    
428        @Override
429        protected void doStop() throws Exception {
430            // noop
431        }
432    
433        @Override
434        protected void doShutdown() throws Exception {
435            // shutdown all remainder executor services by looping and doing this aggressively
436            // as by normal all threads pool should have been shutdown using proper lifecycle
437            // by their EIPs, components etc. This is acting as a fail-safe during shutdown
438            // of CamelContext itself.
439            Set<ExecutorService> forced = new LinkedHashSet<ExecutorService>();
440            if (!executorServices.isEmpty()) {
441                // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also
442                LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size());
443                for (ExecutorService executorService : executorServices) {
444                    try {
445                        boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true);
446                        // remember the thread pools that was forced to shutdown (eg warned)
447                        if (warned) {
448                            forced.add(executorService);
449                        }
450                    } catch (Throwable e) {
451                        // only log if something goes wrong as we want to shutdown them all
452                        LOG.warn("Error occurred during shutdown of ExecutorService: "
453                                + executorService + ". This exception will be ignored.", e);
454                    }
455                }
456            }
457    
458            // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his
459            if (!forced.isEmpty()) {
460                LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size());
461                for (ExecutorService executorService : forced) {
462                    LOG.warn("  forced -> {}", executorService);
463                }
464            }
465            forced.clear();
466    
467            // clear list
468            executorServices.clear();
469    
470            // do not clear the default profile as we could potential be restarted
471            Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator();
472            while (it.hasNext()) {
473                ThreadPoolProfile profile = it.next();
474                if (!profile.isDefaultProfile()) {
475                    it.remove();
476                }
477            }
478        }
479    
480        /**
481         * Invoked when a new thread pool is created.
482         * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext,
483         * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method,
484         * which for example will enlist the thread pool in JMX management.
485         *
486         * @param executorService the thread pool
487         * @param source          the source to use the thread pool
488         * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile
489         */
490        private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) {
491            // add to internal list of thread pools
492            executorServices.add(executorService);
493    
494            String id;
495            String sourceId = null;
496            String routeId = null;
497    
498            // extract id from source
499            if (source instanceof NamedNode) {
500                id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory());
501                // and let source be the short name of the pattern
502                sourceId = ((NamedNode) source).getShortName();
503            } else if (source instanceof String) {
504                id = (String) source;
505            } else if (source != null) {
506                if (source instanceof StaticService) {
507                    // the source is static service so its name would be unique
508                    id = source.getClass().getSimpleName();
509                } else {
510                    // fallback and use the simple class name with hashcode for the id so its unique for this given source
511                    id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")";
512                }
513            } else {
514                // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique
515                id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")";
516            }
517    
518            // id is mandatory
519            ObjectHelper.notEmpty(id, "id for thread pool " + executorService);
520    
521            // extract route id if possible
522            if (source instanceof ProcessorDefinition) {
523                RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source);
524                if (route != null) {
525                    routeId = route.idOrCreate(this.camelContext.getNodeIdFactory());
526                }
527            }
528    
529            // let lifecycle strategy be notified as well which can let it be managed in JMX as well
530            ThreadPoolExecutor threadPool = null;
531            if (executorService instanceof ThreadPoolExecutor) {
532                threadPool = (ThreadPoolExecutor) executorService;
533            } else if (executorService instanceof SizedScheduledExecutorService) {
534                threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
535            }
536            if (threadPool != null) {
537                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
538                    lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
539                }
540            }
541    
542            // now call strategy to allow custom logic
543            onNewExecutorService(executorService);
544        }
545    
546        private ThreadFactory createThreadFactory(String name, boolean isDaemon) {
547            ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon);
548            return threadFactory;
549        }
550    
551    }