001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import javax.xml.bind.annotation.XmlAccessType;
025    import javax.xml.bind.annotation.XmlAccessorType;
026    import javax.xml.bind.annotation.XmlAttribute;
027    import javax.xml.bind.annotation.XmlRootElement;
028    import javax.xml.bind.annotation.XmlTransient;
029    import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
030    
031    import org.apache.camel.Processor;
032    import org.apache.camel.ThreadPoolRejectedPolicy;
033    import org.apache.camel.builder.ThreadPoolProfileBuilder;
034    import org.apache.camel.builder.xml.TimeUnitAdapter;
035    import org.apache.camel.processor.Pipeline;
036    import org.apache.camel.processor.ThreadsProcessor;
037    import org.apache.camel.spi.ExecutorServiceManager;
038    import org.apache.camel.spi.RouteContext;
039    import org.apache.camel.spi.ThreadPoolProfile;
040    
041    /**
042     * Represents an XML <threads/> element
043     *
044     * @version 
045     */
046    @XmlRootElement(name = "threads")
047    @XmlAccessorType(XmlAccessType.FIELD)
048    public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> {
049    
050        // TODO: Camel 3.0 Should extend NoOutputDefinition
051    
052        @XmlTransient
053        private ExecutorService executorService;
054        @XmlAttribute
055        private String executorServiceRef;
056        @XmlAttribute
057        private Integer poolSize;
058        @XmlAttribute
059        private Integer maxPoolSize;
060        @XmlAttribute
061        private Long keepAliveTime;
062        @XmlAttribute
063        @XmlJavaTypeAdapter(TimeUnitAdapter.class)
064        private TimeUnit timeUnit;
065        @XmlAttribute
066        private Integer maxQueueSize;
067        @XmlAttribute
068        private String threadName;
069        @XmlAttribute
070        private ThreadPoolRejectedPolicy rejectedPolicy;
071        @XmlAttribute
072        private Boolean callerRunsWhenRejected;
073        
074        public ThreadsDefinition() {
075            this.threadName =  "Threads";
076        }
077    
078        @Override
079        public Processor createProcessor(RouteContext routeContext) throws Exception {
080            // the threads name
081            String name = getThreadName() != null ? getThreadName() : "Threads";
082            // prefer any explicit configured executor service
083            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
084            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
085            // if no explicit then create from the options
086            if (threadPool == null) {
087                ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
088                // create the thread pool using a builder
089                ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
090                        .poolSize(getPoolSize())
091                        .maxPoolSize(getMaxPoolSize())
092                        .keepAliveTime(getKeepAliveTime(), getTimeUnit())
093                        .maxQueueSize(getMaxQueueSize())
094                        .rejectedPolicy(getRejectedPolicy())
095                        .build();
096                threadPool = manager.newThreadPool(this, name, profile);
097                shutdownThreadPool = true;
098            } else {
099                if (getThreadName() != null && !getThreadName().equals("Threads")) {
100                    throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together.");
101                }
102                if (getPoolSize() != null) {
103                    throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together.");
104                }
105                if (getMaxPoolSize() != null) {
106                    throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together.");
107                }
108                if (getKeepAliveTime() != null) {
109                    throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together.");
110                }
111                if (getMaxQueueSize() != null) {
112                    throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together.");
113                }
114                if (getRejectedPolicy() != null) {
115                    throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together.");
116                }
117            }
118    
119            ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
120            if (getCallerRunsWhenRejected() == null) {
121                // should be true by default
122                thread.setCallerRunsWhenRejected(true);
123            } else {
124                thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
125            }
126            thread.setRejectedPolicy(resolveRejectedPolicy(routeContext));
127    
128            List<Processor> pipe = new ArrayList<Processor>(2);
129            pipe.add(thread);
130            pipe.add(createChildProcessor(routeContext, true));
131            // wrap in nested pipeline so this appears as one processor
132            // (recipient list definition does this as well)
133            return new Pipeline(routeContext.getCamelContext(), pipe) {
134                @Override
135                public String toString() {
136                    return "Threads[" + getOutputs() + "]";
137                }
138            };
139        }
140    
141        protected ThreadPoolRejectedPolicy resolveRejectedPolicy(RouteContext routeContext) {
142            if (getExecutorServiceRef() != null && getRejectedPolicy() == null) {
143                ThreadPoolProfile threadPoolProfile = routeContext.getCamelContext().getExecutorServiceManager().getThreadPoolProfile(getExecutorServiceRef());
144                if (threadPoolProfile != null) {
145                    return threadPoolProfile.getRejectedPolicy();
146                }
147            }
148            return getRejectedPolicy();
149        }
150    
151        @Override
152        public String getLabel() {
153            return "threads";
154        }
155    
156        @Override
157        public String getShortName() {
158            return "threads";
159        }
160    
161        @Override
162        public String toString() {
163            return "Threads[" + getOutputs() + "]";
164        }
165    
166        public ThreadsDefinition executorService(ExecutorService executorService) {
167            setExecutorService(executorService);
168            return this;
169        }
170    
171        public ThreadsDefinition executorServiceRef(String executorServiceRef) {
172            setExecutorServiceRef(executorServiceRef);
173            return this;
174        }
175    
176        /**
177         * Sets the core pool size for the underlying {@link java.util.concurrent.ExecutorService}.
178         *
179         * @param poolSize the core pool size to keep minimum in the pool
180         * @return the builder
181         */
182        public ThreadsDefinition poolSize(int poolSize) {
183            setPoolSize(poolSize);
184            return this;
185        }
186    
187        /**
188         * Sets the maximum pool size for the underlying {@link java.util.concurrent.ExecutorService}.
189         *
190         * @param maxPoolSize the maximum pool size
191         * @return the builder
192         */
193        public ThreadsDefinition maxPoolSize(int maxPoolSize) {
194            setMaxPoolSize(maxPoolSize);
195            return this;
196        }
197    
198        /**
199         * Sets the keep alive time for idle threads
200         *
201         * @param keepAliveTime keep alive time
202         * @return the builder
203         */
204        public ThreadsDefinition keepAliveTime(long keepAliveTime) {
205            setKeepAliveTime(keepAliveTime);
206            return this;
207        }
208    
209        /**
210         * Sets the keep alive time unit.
211         * By default SECONDS is used.
212         *
213         * @param keepAliveTimeUnits time unit
214         * @return the builder
215         */
216        public ThreadsDefinition timeUnit(TimeUnit keepAliveTimeUnits) {
217            setTimeUnit(keepAliveTimeUnits);
218            return this;
219        }
220    
221        /**
222         * Sets the maximum number of tasks in the work queue.
223         * <p/>
224         * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
225         *
226         * @param maxQueueSize the max queue size
227         * @return the builder
228         */
229        public ThreadsDefinition maxQueueSize(int maxQueueSize) {
230            setMaxQueueSize(maxQueueSize);
231            return this;
232        }
233    
234        /**
235         * Sets the handler for tasks which cannot be executed by the thread pool.
236         *
237         * @param rejectedPolicy  the policy for the handler
238         * @return the builder
239         */
240        public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
241            setRejectedPolicy(rejectedPolicy);
242            return this;
243        }
244    
245        /**
246         * Sets the thread name to use.
247         *
248         * @param threadName the thread name
249         * @return the builder
250         */
251        public ThreadsDefinition threadName(String threadName) {
252            setThreadName(threadName);
253            return this;
254        }
255    
256        /**
257         * Whether or not the caller should run the task when it was rejected by the thread pool.
258         * <p/>
259         * Is by default <tt>true</tt>
260         *
261         * @param callerRunsWhenRejected whether or not the caller should run
262         * @return the builder
263         */
264        public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
265            setCallerRunsWhenRejected(callerRunsWhenRejected);
266            return this;
267        }
268    
269        public ExecutorService getExecutorService() {
270            return executorService;
271        }
272    
273        public void setExecutorService(ExecutorService executorService) {
274            this.executorService = executorService;
275        }
276    
277        public String getExecutorServiceRef() {
278            return executorServiceRef;
279        }
280    
281        public void setExecutorServiceRef(String executorServiceRef) {
282            this.executorServiceRef = executorServiceRef;
283        }
284    
285        public Integer getPoolSize() {
286            return poolSize;
287        }
288    
289        public void setPoolSize(Integer poolSize) {
290            this.poolSize = poolSize;
291        }
292    
293        public Integer getMaxPoolSize() {
294            return maxPoolSize;
295        }
296    
297        public void setMaxPoolSize(Integer maxPoolSize) {
298            this.maxPoolSize = maxPoolSize;
299        }
300    
301        public Long getKeepAliveTime() {
302            return keepAliveTime;
303        }
304    
305        public void setKeepAliveTime(Long keepAliveTime) {
306            this.keepAliveTime = keepAliveTime;
307        }
308    
309        public TimeUnit getTimeUnit() {
310            return timeUnit;
311        }
312    
313        public void setTimeUnit(TimeUnit timeUnit) {
314            this.timeUnit = timeUnit;
315        }
316    
317        public Integer getMaxQueueSize() {
318            return maxQueueSize;
319        }
320    
321        public void setMaxQueueSize(Integer maxQueueSize) {
322            this.maxQueueSize = maxQueueSize;
323        }
324    
325        public String getThreadName() {
326            return threadName;
327        }
328    
329        public void setThreadName(String threadName) {
330            this.threadName = threadName;
331        }
332    
333        public ThreadPoolRejectedPolicy getRejectedPolicy() {
334            return rejectedPolicy;
335        }
336    
337        public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
338            this.rejectedPolicy = rejectedPolicy;
339        }
340    
341        public Boolean getCallerRunsWhenRejected() {
342            return callerRunsWhenRejected;
343        }
344    
345        public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
346            this.callerRunsWhenRejected = callerRunsWhenRejected;
347        }
348    }