001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.TimeUnit; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlRootElement; 028 import javax.xml.bind.annotation.XmlTransient; 029 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; 030 031 import org.apache.camel.Processor; 032 import org.apache.camel.ThreadPoolRejectedPolicy; 033 import org.apache.camel.builder.ThreadPoolProfileBuilder; 034 import org.apache.camel.builder.xml.TimeUnitAdapter; 035 import org.apache.camel.processor.Pipeline; 036 import org.apache.camel.processor.ThreadsProcessor; 037 import org.apache.camel.spi.ExecutorServiceManager; 038 import org.apache.camel.spi.RouteContext; 039 import org.apache.camel.spi.ThreadPoolProfile; 040 041 /** 042 * Represents an XML <threads/> element 043 * 044 * @version 045 */ 046 @XmlRootElement(name = "threads") 047 @XmlAccessorType(XmlAccessType.FIELD) 048 public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> { 049 050 // TODO: Camel 3.0 Should extend NoOutputDefinition 051 052 @XmlTransient 053 private ExecutorService executorService; 054 @XmlAttribute 055 private String executorServiceRef; 056 @XmlAttribute 057 private Integer poolSize; 058 @XmlAttribute 059 private Integer maxPoolSize; 060 @XmlAttribute 061 private Long keepAliveTime; 062 @XmlAttribute 063 @XmlJavaTypeAdapter(TimeUnitAdapter.class) 064 private TimeUnit timeUnit; 065 @XmlAttribute 066 private Integer maxQueueSize; 067 @XmlAttribute 068 private String threadName; 069 @XmlAttribute 070 private ThreadPoolRejectedPolicy rejectedPolicy; 071 @XmlAttribute 072 private Boolean callerRunsWhenRejected; 073 074 public ThreadsDefinition() { 075 this.threadName = "Threads"; 076 } 077 078 @Override 079 public Processor createProcessor(RouteContext routeContext) throws Exception { 080 // the threads name 081 String name = getThreadName() != null ? getThreadName() : "Threads"; 082 // prefer any explicit configured executor service 083 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 084 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); 085 // if no explicit then create from the options 086 if (threadPool == null) { 087 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 088 // create the thread pool using a builder 089 ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) 090 .poolSize(getPoolSize()) 091 .maxPoolSize(getMaxPoolSize()) 092 .keepAliveTime(getKeepAliveTime(), getTimeUnit()) 093 .maxQueueSize(getMaxQueueSize()) 094 .rejectedPolicy(getRejectedPolicy()) 095 .build(); 096 threadPool = manager.newThreadPool(this, name, profile); 097 shutdownThreadPool = true; 098 } else { 099 if (getThreadName() != null && !getThreadName().equals("Threads")) { 100 throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together."); 101 } 102 if (getPoolSize() != null) { 103 throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together."); 104 } 105 if (getMaxPoolSize() != null) { 106 throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together."); 107 } 108 if (getKeepAliveTime() != null) { 109 throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); 110 } 111 if (getMaxQueueSize() != null) { 112 throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); 113 } 114 if (getRejectedPolicy() != null) { 115 throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together."); 116 } 117 } 118 119 ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool); 120 if (getCallerRunsWhenRejected() == null) { 121 // should be true by default 122 thread.setCallerRunsWhenRejected(true); 123 } else { 124 thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); 125 } 126 thread.setRejectedPolicy(resolveRejectedPolicy(routeContext)); 127 128 List<Processor> pipe = new ArrayList<Processor>(2); 129 pipe.add(thread); 130 pipe.add(createChildProcessor(routeContext, true)); 131 // wrap in nested pipeline so this appears as one processor 132 // (recipient list definition does this as well) 133 return new Pipeline(routeContext.getCamelContext(), pipe) { 134 @Override 135 public String toString() { 136 return "Threads[" + getOutputs() + "]"; 137 } 138 }; 139 } 140 141 protected ThreadPoolRejectedPolicy resolveRejectedPolicy(RouteContext routeContext) { 142 if (getExecutorServiceRef() != null && getRejectedPolicy() == null) { 143 ThreadPoolProfile threadPoolProfile = routeContext.getCamelContext().getExecutorServiceManager().getThreadPoolProfile(getExecutorServiceRef()); 144 if (threadPoolProfile != null) { 145 return threadPoolProfile.getRejectedPolicy(); 146 } 147 } 148 return getRejectedPolicy(); 149 } 150 151 @Override 152 public String getLabel() { 153 return "threads"; 154 } 155 156 @Override 157 public String getShortName() { 158 return "threads"; 159 } 160 161 @Override 162 public String toString() { 163 return "Threads[" + getOutputs() + "]"; 164 } 165 166 public ThreadsDefinition executorService(ExecutorService executorService) { 167 setExecutorService(executorService); 168 return this; 169 } 170 171 public ThreadsDefinition executorServiceRef(String executorServiceRef) { 172 setExecutorServiceRef(executorServiceRef); 173 return this; 174 } 175 176 /** 177 * Sets the core pool size for the underlying {@link java.util.concurrent.ExecutorService}. 178 * 179 * @param poolSize the core pool size to keep minimum in the pool 180 * @return the builder 181 */ 182 public ThreadsDefinition poolSize(int poolSize) { 183 setPoolSize(poolSize); 184 return this; 185 } 186 187 /** 188 * Sets the maximum pool size for the underlying {@link java.util.concurrent.ExecutorService}. 189 * 190 * @param maxPoolSize the maximum pool size 191 * @return the builder 192 */ 193 public ThreadsDefinition maxPoolSize(int maxPoolSize) { 194 setMaxPoolSize(maxPoolSize); 195 return this; 196 } 197 198 /** 199 * Sets the keep alive time for idle threads 200 * 201 * @param keepAliveTime keep alive time 202 * @return the builder 203 */ 204 public ThreadsDefinition keepAliveTime(long keepAliveTime) { 205 setKeepAliveTime(keepAliveTime); 206 return this; 207 } 208 209 /** 210 * Sets the keep alive time unit. 211 * By default SECONDS is used. 212 * 213 * @param keepAliveTimeUnits time unit 214 * @return the builder 215 */ 216 public ThreadsDefinition timeUnit(TimeUnit keepAliveTimeUnits) { 217 setTimeUnit(keepAliveTimeUnits); 218 return this; 219 } 220 221 /** 222 * Sets the maximum number of tasks in the work queue. 223 * <p/> 224 * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue 225 * 226 * @param maxQueueSize the max queue size 227 * @return the builder 228 */ 229 public ThreadsDefinition maxQueueSize(int maxQueueSize) { 230 setMaxQueueSize(maxQueueSize); 231 return this; 232 } 233 234 /** 235 * Sets the handler for tasks which cannot be executed by the thread pool. 236 * 237 * @param rejectedPolicy the policy for the handler 238 * @return the builder 239 */ 240 public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 241 setRejectedPolicy(rejectedPolicy); 242 return this; 243 } 244 245 /** 246 * Sets the thread name to use. 247 * 248 * @param threadName the thread name 249 * @return the builder 250 */ 251 public ThreadsDefinition threadName(String threadName) { 252 setThreadName(threadName); 253 return this; 254 } 255 256 /** 257 * Whether or not the caller should run the task when it was rejected by the thread pool. 258 * <p/> 259 * Is by default <tt>true</tt> 260 * 261 * @param callerRunsWhenRejected whether or not the caller should run 262 * @return the builder 263 */ 264 public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 265 setCallerRunsWhenRejected(callerRunsWhenRejected); 266 return this; 267 } 268 269 public ExecutorService getExecutorService() { 270 return executorService; 271 } 272 273 public void setExecutorService(ExecutorService executorService) { 274 this.executorService = executorService; 275 } 276 277 public String getExecutorServiceRef() { 278 return executorServiceRef; 279 } 280 281 public void setExecutorServiceRef(String executorServiceRef) { 282 this.executorServiceRef = executorServiceRef; 283 } 284 285 public Integer getPoolSize() { 286 return poolSize; 287 } 288 289 public void setPoolSize(Integer poolSize) { 290 this.poolSize = poolSize; 291 } 292 293 public Integer getMaxPoolSize() { 294 return maxPoolSize; 295 } 296 297 public void setMaxPoolSize(Integer maxPoolSize) { 298 this.maxPoolSize = maxPoolSize; 299 } 300 301 public Long getKeepAliveTime() { 302 return keepAliveTime; 303 } 304 305 public void setKeepAliveTime(Long keepAliveTime) { 306 this.keepAliveTime = keepAliveTime; 307 } 308 309 public TimeUnit getTimeUnit() { 310 return timeUnit; 311 } 312 313 public void setTimeUnit(TimeUnit timeUnit) { 314 this.timeUnit = timeUnit; 315 } 316 317 public Integer getMaxQueueSize() { 318 return maxQueueSize; 319 } 320 321 public void setMaxQueueSize(Integer maxQueueSize) { 322 this.maxQueueSize = maxQueueSize; 323 } 324 325 public String getThreadName() { 326 return threadName; 327 } 328 329 public void setThreadName(String threadName) { 330 this.threadName = threadName; 331 } 332 333 public ThreadPoolRejectedPolicy getRejectedPolicy() { 334 return rejectedPolicy; 335 } 336 337 public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 338 this.rejectedPolicy = rejectedPolicy; 339 } 340 341 public Boolean getCallerRunsWhenRejected() { 342 return callerRunsWhenRejected; 343 } 344 345 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 346 this.callerRunsWhenRejected = callerRunsWhenRejected; 347 } 348 }