001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.LinkedHashSet; 022 import java.util.List; 023 import java.util.Set; 024 import java.util.concurrent.ExecutorService; 025 import java.util.concurrent.ScheduledExecutorService; 026 027 import org.apache.camel.spi.ExecutorServiceManager; 028 import org.apache.camel.spi.RouteContext; 029 import org.apache.camel.util.ObjectHelper; 030 031 /** 032 * Helper class for ProcessorDefinition and the other model classes. 033 */ 034 public final class ProcessorDefinitionHelper { 035 036 private ProcessorDefinitionHelper() { 037 } 038 039 /** 040 * Looks for the given type in the list of outputs and recurring all the children as well. 041 * 042 * @param outputs list of outputs, can be null or empty. 043 * @param type the type to look for 044 * @return the found definitions, or <tt>null</tt> if not found 045 */ 046 public static <T> Iterator<T> filterTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) { 047 List<T> found = new ArrayList<T>(); 048 doFindType(outputs, type, found); 049 return found.iterator(); 050 } 051 052 /** 053 * Looks for the given type in the list of outputs and recurring all the children as well. 054 * Will stop at first found and return it. 055 * 056 * @param outputs list of outputs, can be null or empty. 057 * @param type the type to look for 058 * @return the first found type, or <tt>null</tt> if not found 059 */ 060 public static <T> T findFirstTypeInOutputs(List<ProcessorDefinition<?>> outputs, Class<T> type) { 061 List<T> found = new ArrayList<T>(); 062 doFindType(outputs, type, found); 063 if (found.isEmpty()) { 064 return null; 065 } 066 return found.iterator().next(); 067 } 068 069 /** 070 * Is the given child the first in the outputs from the parent? 071 * 072 * @param parentType the type the parent must be 073 * @param node the node 074 * @return <tt>true</tt> if first child, <tt>false</tt> otherwise 075 */ 076 public static boolean isFirstChildOfType(Class<?> parentType, ProcessorDefinition<?> node) { 077 if (node == null || node.getParent() == null) { 078 return false; 079 } 080 081 if (node.getParent().getOutputs().isEmpty()) { 082 return false; 083 } 084 085 if (!(node.getParent().getClass().equals(parentType))) { 086 return false; 087 } 088 089 return node.getParent().getOutputs().get(0).equals(node); 090 } 091 092 /** 093 * Is the given node parent(s) of the given type 094 * @param parentType the parent type 095 * @param node the current node 096 * @param recursive whether or not to check grand parent(s) as well 097 * @return <tt>true</tt> if parent(s) is of given type, <tt>false</tt> otherwise 098 */ 099 public static boolean isParentOfType(Class<?> parentType, ProcessorDefinition<?> node, boolean recursive) { 100 if (node == null || node.getParent() == null) { 101 return false; 102 } 103 104 if (parentType.isAssignableFrom(node.getParent().getClass())) { 105 return true; 106 } else if (recursive) { 107 // recursive up the tree of parents 108 return isParentOfType(parentType, node.getParent(), true); 109 } else { 110 // no match 111 return false; 112 } 113 } 114 115 /** 116 * Gets the route definition the given node belongs to. 117 * 118 * @param node the node 119 * @return the route, or <tt>null</tt> if not possible to find 120 */ 121 public static RouteDefinition getRoute(ProcessorDefinition<?> node) { 122 if (node == null) { 123 return null; 124 } 125 126 ProcessorDefinition<?> def = node; 127 // drill to the top 128 while (def != null && def.getParent() != null) { 129 def = def.getParent(); 130 } 131 132 if (def instanceof RouteDefinition) { 133 return (RouteDefinition) def; 134 } else { 135 // not found 136 return null; 137 } 138 } 139 140 /** 141 * Gets the route id the given node belongs to. 142 * 143 * @param node the node 144 * @return the route id, or <tt>null</tt> if not possible to find 145 */ 146 public static String getRouteId(ProcessorDefinition<?> node) { 147 RouteDefinition route = getRoute(node); 148 return route != null ? route.getId() : null; 149 } 150 151 /** 152 * Traverses the node, including its children (recursive), and gathers all the node ids. 153 * 154 * @param node the target node 155 * @param set set to store ids, if <tt>null</tt> a new set will be created 156 * @param onlyCustomId whether to only store custom assigned ids (ie. {@link org.apache.camel.model.OptionalIdentifiedDefinition#hasCustomIdAssigned()} 157 * @param includeAbstract whether to include abstract nodes (ie. {@link org.apache.camel.model.ProcessorDefinition#isAbstract()} 158 * @return the set with the found ids. 159 */ 160 public static Set<String> gatherAllNodeIds(ProcessorDefinition<?> node, Set<String> set, 161 boolean onlyCustomId, boolean includeAbstract) { 162 if (node == null) { 163 return set; 164 } 165 166 // skip abstract 167 if (node.isAbstract() && !includeAbstract) { 168 return set; 169 } 170 171 if (set == null) { 172 set = new LinkedHashSet<String>(); 173 } 174 175 // add ourselves 176 if (node.getId() != null) { 177 if (!onlyCustomId || node.hasCustomIdAssigned() && onlyCustomId) { 178 set.add(node.getId()); 179 } 180 } 181 182 // traverse outputs and recursive children as well 183 List<ProcessorDefinition<?>> children = node.getOutputs(); 184 if (children != null && !children.isEmpty()) { 185 for (ProcessorDefinition<?> child : children) { 186 // traverse children also 187 gatherAllNodeIds(child, set, onlyCustomId, includeAbstract); 188 } 189 } 190 191 return set; 192 } 193 194 @SuppressWarnings({"unchecked", "rawtypes"}) 195 private static <T> void doFindType(List<ProcessorDefinition<?>> outputs, Class<T> type, List<T> found) { 196 if (outputs == null || outputs.isEmpty()) { 197 return; 198 } 199 200 for (ProcessorDefinition out : outputs) { 201 if (type.isInstance(out)) { 202 found.add((T)out); 203 } 204 205 // send is much common 206 if (out instanceof SendDefinition) { 207 SendDefinition send = (SendDefinition) out; 208 List<ProcessorDefinition<?>> children = send.getOutputs(); 209 doFindType(children, type, found); 210 } 211 212 // special for choice 213 if (out instanceof ChoiceDefinition) { 214 ChoiceDefinition choice = (ChoiceDefinition) out; 215 for (WhenDefinition when : choice.getWhenClauses()) { 216 List<ProcessorDefinition<?>> children = when.getOutputs(); 217 doFindType(children, type, found); 218 } 219 220 // otherwise is optional 221 if (choice.getOtherwise() != null) { 222 List<ProcessorDefinition<?>> children = choice.getOtherwise().getOutputs(); 223 doFindType(children, type, found); 224 } 225 } 226 227 // special for try ... catch ... finally 228 if (out instanceof TryDefinition) { 229 TryDefinition doTry = (TryDefinition) out; 230 List<ProcessorDefinition<?>> doTryOut = doTry.getOutputsWithoutCatches(); 231 doFindType(doTryOut, type, found); 232 233 List<CatchDefinition> doTryCatch = doTry.getCatchClauses(); 234 for (CatchDefinition doCatch : doTryCatch) { 235 doFindType(doCatch.getOutputs(), type, found); 236 } 237 238 if (doTry.getFinallyClause() != null) { 239 doFindType(doTry.getFinallyClause().getOutputs(), type, found); 240 } 241 242 // do not check children as we already did that 243 continue; 244 } 245 246 // try children as well 247 List<ProcessorDefinition<?>> children = out.getOutputs(); 248 doFindType(children, type, found); 249 } 250 } 251 252 /** 253 * Is there any outputs in the given list. 254 * <p/> 255 * Is used for check if the route output has any real outputs (non abstracts) 256 * 257 * @param outputs the outputs 258 * @param excludeAbstract whether or not to exclude abstract outputs (e.g. skip onException etc.) 259 * @return <tt>true</tt> if has outputs, otherwise <tt>false</tt> is returned 260 */ 261 @SuppressWarnings({"unchecked", "rawtypes"}) 262 public static boolean hasOutputs(List<ProcessorDefinition<?>> outputs, boolean excludeAbstract) { 263 if (outputs == null || outputs.isEmpty()) { 264 return false; 265 } 266 if (!excludeAbstract) { 267 return !outputs.isEmpty(); 268 } 269 for (ProcessorDefinition output : outputs) { 270 if (output instanceof TransactedDefinition || output instanceof PolicyDefinition) { 271 // special for those as they wrap entire output, so we should just check its output 272 return hasOutputs(output.getOutputs(), excludeAbstract); 273 } 274 if (!output.isAbstract()) { 275 return true; 276 } 277 } 278 return false; 279 } 280 281 /** 282 * Determines whether a new thread pool will be created or not. 283 * <p/> 284 * This is used to know if a new thread pool will be created, and therefore is not shared by others, and therefore 285 * exclusive to the definition. 286 * 287 * @param routeContext the route context 288 * @param definition the node definition which may leverage executor service. 289 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 290 * @return <tt>true</tt> if a new thread pool will be created, <tt>false</tt> if not 291 * @see #getConfiguredExecutorService(org.apache.camel.spi.RouteContext, String, ExecutorServiceAwareDefinition, boolean) 292 */ 293 public static boolean willCreateNewThreadPool(RouteContext routeContext, ExecutorServiceAwareDefinition<?> definition, boolean useDefault) { 294 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 295 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 296 297 if (definition.getExecutorService() != null) { 298 // no there is a custom thread pool configured 299 return false; 300 } else if (definition.getExecutorServiceRef() != null) { 301 ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(definition.getExecutorServiceRef(), ExecutorService.class); 302 // if no existing thread pool, then we will have to create a new thread pool 303 return answer == null; 304 } else if (useDefault) { 305 return true; 306 } 307 308 return false; 309 } 310 311 /** 312 * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ExecutorService} registered with the given 313 * <tt>executorServiceRef</tt> name. 314 * <p/> 315 * This method will lookup for configured thread pool in the following order 316 * <ul> 317 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 318 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 319 * <li>if none found, then <tt>null</tt> is returned.</li> 320 * </ul> 321 * @param routeContext the route context 322 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 323 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 324 * @param source the source to use the thread pool 325 * @param executorServiceRef reference name of the thread pool 326 * @return the executor service, or <tt>null</tt> if none was found. 327 */ 328 public static ExecutorService lookupExecutorServiceRef(RouteContext routeContext, String name, 329 Object source, String executorServiceRef) { 330 331 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 332 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 333 ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); 334 335 // lookup in registry first and use existing thread pool if exists 336 ExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ExecutorService.class); 337 if (answer == null) { 338 // then create a thread pool assuming the ref is a thread pool profile id 339 answer = manager.newThreadPool(source, name, executorServiceRef); 340 } 341 return answer; 342 } 343 344 /** 345 * Will lookup and get the configured {@link java.util.concurrent.ExecutorService} from the given definition. 346 * <p/> 347 * This method will lookup for configured thread pool in the following order 348 * <ul> 349 * <li>from the definition if any explicit configured executor service.</li> 350 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 351 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 352 * <li>if none found, then <tt>null</tt> is returned.</li> 353 * </ul> 354 * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support 355 * configured executor services in the same coherent way. 356 * 357 * @param routeContext the route context 358 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 359 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 360 * @param definition the node definition which may leverage executor service. 361 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 362 * @return the configured executor service, or <tt>null</tt> if none was configured. 363 * @throws IllegalArgumentException is thrown if lookup of executor service in {@link org.apache.camel.spi.Registry} was not found 364 */ 365 public static ExecutorService getConfiguredExecutorService(RouteContext routeContext, String name, 366 ExecutorServiceAwareDefinition<?> definition, 367 boolean useDefault) throws IllegalArgumentException { 368 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 369 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 370 371 // prefer to use explicit configured executor on the definition 372 if (definition.getExecutorService() != null) { 373 return definition.getExecutorService(); 374 } else if (definition.getExecutorServiceRef() != null) { 375 // lookup in registry first and use existing thread pool if exists 376 ExecutorService answer = lookupExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); 377 if (answer == null) { 378 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); 379 } 380 return answer; 381 } else if (useDefault) { 382 return manager.newDefaultThreadPool(definition, name); 383 } 384 385 return null; 386 } 387 388 /** 389 * Will lookup in {@link org.apache.camel.spi.Registry} for a {@link ScheduledExecutorService} registered with the given 390 * <tt>executorServiceRef</tt> name. 391 * <p/> 392 * This method will lookup for configured thread pool in the following order 393 * <ul> 394 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 395 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 396 * <li>if none found, then <tt>null</tt> is returned.</li> 397 * </ul> 398 * @param routeContext the route context 399 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 400 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 401 * @param source the source to use the thread pool 402 * @param executorServiceRef reference name of the thread pool 403 * @return the executor service, or <tt>null</tt> if none was found. 404 */ 405 public static ScheduledExecutorService lookupScheduledExecutorServiceRef(RouteContext routeContext, String name, 406 Object source, String executorServiceRef) { 407 408 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 409 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 410 ObjectHelper.notNull(executorServiceRef, "executorServiceRef"); 411 412 // lookup in registry first and use existing thread pool if exists 413 ScheduledExecutorService answer = routeContext.getCamelContext().getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); 414 if (answer == null) { 415 // then create a thread pool assuming the ref is a thread pool profile id 416 answer = manager.newScheduledThreadPool(source, name, executorServiceRef); 417 } 418 return answer; 419 } 420 421 /** 422 * Will lookup and get the configured {@link java.util.concurrent.ScheduledExecutorService} from the given definition. 423 * <p/> 424 * This method will lookup for configured thread pool in the following order 425 * <ul> 426 * <li>from the definition if any explicit configured executor service.</li> 427 * <li>from the {@link org.apache.camel.spi.Registry} if found</li> 428 * <li>from the known list of {@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> 429 * <li>if none found, then <tt>null</tt> is returned.</li> 430 * </ul> 431 * The various {@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support 432 * configured executor services in the same coherent way. 433 * 434 * @param routeContext the rout context 435 * @param name name which is appended to the thread name, when the {@link java.util.concurrent.ExecutorService} 436 * is created based on a {@link org.apache.camel.spi.ThreadPoolProfile}. 437 * @param definition the node definition which may leverage executor service. 438 * @param useDefault whether to fallback and use a default thread pool, if no explicit configured 439 * @return the configured executor service, or <tt>null</tt> if none was configured. 440 * @throws IllegalArgumentException is thrown if the found instance is not a ScheduledExecutorService type, 441 * or lookup of executor service in {@link org.apache.camel.spi.Registry} was not found 442 */ 443 public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name, 444 ExecutorServiceAwareDefinition<?> definition, 445 boolean useDefault) throws IllegalArgumentException { 446 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 447 ObjectHelper.notNull(manager, "ExecutorServiceManager", routeContext.getCamelContext()); 448 449 // prefer to use explicit configured executor on the definition 450 if (definition.getExecutorService() != null) { 451 ExecutorService executorService = definition.getExecutorService(); 452 if (executorService instanceof ScheduledExecutorService) { 453 return (ScheduledExecutorService) executorService; 454 } 455 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); 456 } else if (definition.getExecutorServiceRef() != null) { 457 ScheduledExecutorService answer = lookupScheduledExecutorServiceRef(routeContext, name, definition, definition.getExecutorServiceRef()); 458 if (answer == null) { 459 throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry or as a thread pool profile."); 460 } 461 return answer; 462 } else if (useDefault) { 463 return manager.newDefaultScheduledThreadPool(definition, name); 464 } 465 466 return null; 467 } 468 469 }