001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.LinkedHashSet;
022    import java.util.List;
023    import java.util.Set;
024    import java.util.concurrent.ExecutorService;
025    import java.util.concurrent.ScheduledExecutorService;
026    
027    import org.apache.camel.spi.ExecutorServiceManager;
028    import org.apache.camel.spi.RouteContext;
029    import org.apache.camel.util.ObjectHelper;
030    
031    /**
032     * Helper class for ProcessorDefinition and the other model classes.
033     */
034    public final class ProcessorDefinitionHelper {
035    
036        private ProcessorDefinitionHelper() {
037        }
038    
039        /**
040         * Looks for the given type in the list of outputs and recurring all the children as well.
041         *
042         * @param outputs  list of outputs, can be null or empty.
043         * @param type     the type to look for
044         * @return         the found definitions, or <tt>null</tt> if not found
045         */
046        public static <T> Iterator<T> filterTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) {
047            List<T> found = new ArrayList<T>();
048            doFindType(outputs, type, found);
049            return found.iterator();
050        }
051    
052        /**
053         * Looks for the given type in the list of outputs and recurring all the children as well.
054         * Will stop at first found and return it.
055         *
056         * @param outputs  list of outputs, can be null or empty.
057         * @param type     the type to look for
058         * @return         the first found type, or <tt>null</tt> if not found
059         */
060        public static <T> T findFirstTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) {
061            List<T> found = new ArrayList<T>();
062            doFindType(outputs, type, found);
063            if (found.isEmpty()) {
064                return null;
065            }
066            return found.iterator().next();
067        }
068    
069        /**
070         * Is the given child the first in the outputs from the parent?
071         *
072         * @param parentType the type the parent must be
073         * @param node the node
074         * @return <tt>true</tt> if first child, <tt>false</tt> otherwise
075         */
076        public static boolean isFirstChildOfType(Class<?> parentType, ProcessorDefinition<?> node) {
077            if (node == null || node.getParent() == null) {
078                return false;
079            }
080    
081            if (node.getParent().getOutputs().isEmpty()) {
082                return false;
083            }
084    
085            if (!(node.getParent().getClass().equals(parentType))) {
086                return false;
087            }
088    
089            return node.getParent().getOutputs().get(0).equals(node);
090        }
091    
092        /**
093         * Is the given node parent(s) of the given type
094         * @param parentType   the parent type
095         * @param node         the current node
096         * @param recursive    whether or not to check grand parent(s) as well
097         * @return <tt>true</tt> if parent(s) is of given type, <tt>false</tt> otherwise
098         */
099        public static boolean isParentOfType(Class<?> parentType, ProcessorDefinition<?> node, boolean recursive) {
100            if (node == null || node.getParent() == null) {
101                return false;
102            }
103    
104            if (parentType.isAssignableFrom(node.getParent().getClass())) {
105                return true;
106            } else if (recursive) {
107                // recursive up the tree of parents
108                return isParentOfType(parentType, node.getParent(), true);
109            } else {
110                // no match
111                return false;
112            }
113        }
114    
115        /**
116         * Gets the route definition the given node belongs to.
117         *
118         * @param node the node
119         * @return the route, or <tt>null</tt> if not possible to find
120         */
121        public static RouteDefinition getRoute(ProcessorDefinition<?> node) {
122            if (node == null) {
123                return null;
124            }
125    
126            ProcessorDefinition<?> def = node;
127            // drill to the top
128            while (def != null && def.getParent() != null) {
129                def = def.getParent();
130            }
131    
132            if (def instanceof RouteDefinition) {
133                return (RouteDefinition) def;
134            } else {
135                // not found
136                return null;
137            }
138        }
139    
140        /**
141         * Gets the route id the given node belongs to.
142         *
143         * @param node the node
144         * @return the route id, or <tt>null</tt> if not possible to find
145         */
146        public static String getRouteId(ProcessorDefinition<?> node) {
147            RouteDefinition route = getRoute(node);
148            return route != null ? route.getId() : null;
149        }
150    
151        /**
152         * Traverses the node, including its children (recursive), and gathers all the node ids.
153         *
154         * @param node  the target node
155         * @param set   set to store ids, if <tt>null</tt> a new set will be created
156         * @param onlyCustomId  whether to only store custom assigned ids (ie. {@link org.apache.camel.model.OptionalIdentifiedDefinition#hasCustomIdAssigned()}
157         * @param includeAbstract whether to include abstract nodes (ie. {@link org.apache.camel.model.ProcessorDefinition#isAbstract()}
158         * @return the set with the found ids.
159         */
160        public static Set<String> gatherAllNodeIds(ProcessorDefinition<?> node, Set<String> set,
161                                                   boolean onlyCustomId, boolean includeAbstract) {
162            if (node == null) {
163                return set;
164            }
165    
166            // skip abstract
167            if (node.isAbstract() && !includeAbstract) {
168                return set;
169            }
170    
171            if (set == null) {
172                set = new LinkedHashSet<String>();
173            }
174    
175            // add ourselves
176            if (node.getId() != null) {
177                if (!onlyCustomId || node.hasCustomIdAssigned() && onlyCustomId) {
178                    set.add(node.getId());
179                }
180            }
181    
182            // traverse outputs and recursive children as well
183            List<ProcessorDefinition<?>> children = node.getOutputs();
184            if (children != null && !children.isEmpty()) {
185                for (ProcessorDefinition<?> child : children) {
186                    // traverse children also
187                    gatherAllNodeIds(child, set, onlyCustomId, includeAbstract);
188                }
189            }
190    
191            return set;
192        }
193    
194        @SuppressWarnings({"unchecked", "rawtypes"})
195        private static <T> void doFindType(List<ProcessorDefinition<?>> outputs, Class<T> type, List<T> found) {
196            if (outputs == null || outputs.isEmpty()) {
197                return;
198            }
199    
200            for (ProcessorDefinition out : outputs) {
201                if (type.isInstance(out)) {
202                    found.add((T)out);
203                }
204    
205                // send is much common
206                if (out instanceof SendDefinition) {
207                    SendDefinition send = (SendDefinition) out;
208                    List<ProcessorDefinition<?>> children = send.getOutputs();
209                    doFindType(children, type, found);
210                }
211    
212                // special for choice
213                if (out instanceof ChoiceDefinition) {
214                    ChoiceDefinition choice = (ChoiceDefinition) out;
215                    for (WhenDefinition when : choice.getWhenClauses()) {
216                        List<ProcessorDefinition<?>> children = when.getOutputs();
217                        doFindType(children, type, found);
218                    }
219    
220                    // otherwise is optional
221                    if (choice.getOtherwise() != null) {
222                        List<ProcessorDefinition<?>> children = choice.getOtherwise().getOutputs();
223                        doFindType(children, type, found);
224                    }
225                }
226    
227                // special for try ... catch ... finally
228                if (out instanceof TryDefinition) {
229                    TryDefinition doTry = (TryDefinition) out;
230                    List<ProcessorDefinition<?>> doTryOut = doTry.getOutputsWithoutCatches();
231                    doFindType(doTryOut, type, found);
232    
233                    List<CatchDefinition> doTryCatch = doTry.getCatchClauses();
234                    for (CatchDefinition doCatch : doTryCatch) {
235                        doFindType(doCatch.getOutputs(), type, found);
236                    }
237    
238                    if (doTry.getFinallyClause() != null) {
239                        doFindType(doTry.getFinallyClause().getOutputs(), type, found);
240                    }
241    
242                    // do not check children as we already did that
243                    continue;
244                }
245    
246                // try children as well
247                List<ProcessorDefinition<?>> children = out.getOutputs();
248                doFindType(children, type, found);
249            }
250        }
251    
252        /**
253         * Is there any outputs in the given list.
254         * <p/>
255         * Is used for check if the route output has any real outputs (non abstracts)
256         *
257         * @param outputs           the outputs
258         * @param excludeAbstract   whether or not to exclude abstract outputs (e.g. skip onException etc.)
259         * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is returned
260         */
261        @SuppressWarnings({"unchecked", "rawtypes"})
262        public static boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) {
263            if (outputs == null || outputs.isEmpty()) {
264                return false;
265            }
266            if (!excludeAbstract) {
267                return !outputs.isEmpty();
268            }
269            for (ProcessorDefinition output : outputs) {
270                if (output instanceof TransactedDefinition || output instanceof PolicyDefinition) {
271                    // special for those as they wrap entire output, so we should just check its output
272                    return hasOutputs(output.getOutputs(), excludeAbstract);
273                }
274                if (!output.isAbstract()) {
275                    return true;
276                }
277            }
278            return false;
279        }
280    
281        /**
282         * Determines whether a new thread pool will be created or not.
283         * <p/>
284         * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore
285         * exclusive to the definition.
286         *
287         * @param routeContext   the route context
288         * @param definition     the node definition which may leverage executor service.
289         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
290         * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not
291         * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean)
292         */
293        public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) {
294            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
295            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
296            
297            if (definition.getExecutorService() != null) {
298                // no there is a custom thread pool configured
299                return false;
300            } else if (definition.getExecutorServiceRef() != null) {
301                ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(definition.getExecutorServiceRef(), ExecutorService.class);
302                // if no existing thread pool, then we will have to create a new thread pool
303                return answer == null;
304            } else if (useDefault) {
305                return true;
306            }
307    
308            return false;
309        }
310    
311        /**
312         * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given
313         * <tt>executorServiceRef</tt> name.
314         * <p/>
315         * This method will lookup for configured thread pool in the following order
316         * <ul>
317         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
318         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
319         *   <li>if none found, then <tt>null</tt> is returned.</li>
320         * </ul>
321         * @param routeContext   the route context
322         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
323         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
324         * @param source         the source to use the thread pool
325         * @param executorServiceRef reference name of the thread pool
326         * @return the executor service, or <tt>null</tt> if none was found.
327         */
328        public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name,
329                                                               Object source, String executorServiceRef) {
330    
331            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
332            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
333            ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
334    
335            // lookup in registry first and use existing thread pool if exists
336            ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ExecutorService.class);
337            if (answer == null) {
338                // then create a thread pool assuming the ref is a thread pool profile id
339                answer = manager.newThreadPool(source, name, executorServiceRef);
340            }
341            return answer;
342        }
343    
344        /**
345         * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition.
346         * <p/>
347         * This method will lookup for configured thread pool in the following order
348         * <ul>
349         *   <li>from the definition if any explicit configured executor service.</li>
350         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
351         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
352         *   <li>if none found, then <tt>null</tt> is returned.</li>
353         * </ul>
354         * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
355         * configured executor services in the same coherent way.
356         *
357         * @param routeContext   the route context
358         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
359         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
360         * @param definition     the node definition which may leverage executor service.
361         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
362         * @return the configured executor service, or <tt>null</tt> if none was configured.
363         * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
364         */
365        public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name,
366                                                                   ExecutorServiceAwareDefinition<?> definition,
367                                                                   boolean useDefault) throws IllegalArgumentException {
368            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
369            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
370    
371            // prefer to use explicit configured executor on the definition
372            if (definition.getExecutorService() != null) {
373                return definition.getExecutorService();
374            } else if (definition.getExecutorServiceRef() != null) {
375                // lookup in registry first and use existing thread pool if exists
376                ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
377                if (answer == null) {
378                    throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
379                }
380                return answer;
381            } else if (useDefault) {
382                return manager.newDefaultThreadPool(definition, name);
383            }
384    
385            return null;
386        }
387    
388        /**
389         * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given
390         * <tt>executorServiceRef</tt> name.
391         * <p/>
392         * This method will lookup for configured thread pool in the following order
393         * <ul>
394         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
395         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
396         *   <li>if none found, then <tt>null</tt> is returned.</li>
397         * </ul>
398         * @param routeContext   the route context
399         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
400         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
401         * @param source         the source to use the thread pool
402         * @param executorServiceRef reference name of the thread pool
403         * @return the executor service, or <tt>null</tt> if none was found.
404         */
405        public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name,
406                                                                                 Object source, String executorServiceRef) {
407    
408            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
409            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
410            ObjectHelper.notNull(executorServiceRef, "executorServiceRef");
411    
412            // lookup in registry first and use existing thread pool if exists
413            ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class);
414            if (answer == null) {
415                // then create a thread pool assuming the ref is a thread pool profile id
416                answer = manager.newScheduledThreadPool(source, name, executorServiceRef);
417            }
418            return answer;
419        }
420    
421        /**
422         * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition.
423         * <p/>
424         * This method will lookup for configured thread pool in the following order
425         * <ul>
426         *   <li>from the definition if any explicit configured executor service.</li>
427         *   <li>from the {@link org.apache.camel.spi.Registry} if found</li>
428         *   <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li>
429         *   <li>if none found, then <tt>null</tt> is returned.</li>
430         * </ul>
431         * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support
432         * configured executor services in the same coherent way.
433         *
434         * @param routeContext   the rout context
435         * @param name           name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService}
436         *                       is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}.
437         * @param definition     the node definition which may leverage executor service.
438         * @param useDefault     whether to fallback and use a default thread pool, if no explicit configured
439         * @return the configured executor service, or <tt>null</tt> if none was configured.
440         * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type,
441         * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found
442         */
443        public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name,
444                                                                   ExecutorServiceAwareDefinition<?> definition,
445                                                                   boolean useDefault) throws IllegalArgumentException {
446            ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
447            ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext());
448    
449            // prefer to use explicit configured executor on the definition
450            if (definition.getExecutorService() != null) {
451                ExecutorService executorService = definition.getExecutorService();
452                if (executorService instanceof ScheduledExecutorService) {
453                    return (ScheduledExecutorService) executorService;
454                }
455                throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance");
456            } else if (definition.getExecutorServiceRef() != null) {
457                ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef());
458                if (answer == null) {
459                    throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile.");
460                }
461                return answer;
462            } else if (useDefault) {
463                return manager.newDefaultScheduledThreadPool(definition, name);
464            }
465    
466            return null;
467        }
468    
469    }