001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.NoSuchEndpointException;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Route;
031    import org.apache.camel.ShutdownRoute;
032    import org.apache.camel.ShutdownRunningTask;
033    import org.apache.camel.model.FromDefinition;
034    import org.apache.camel.model.ProcessorDefinition;
035    import org.apache.camel.model.RouteDefinition;
036    import org.apache.camel.processor.CamelInternalProcessor;
037    import org.apache.camel.processor.Pipeline;
038    import org.apache.camel.spi.InterceptStrategy;
039    import org.apache.camel.spi.RouteContext;
040    import org.apache.camel.spi.RoutePolicy;
041    import org.apache.camel.util.CamelContextHelper;
042    import org.apache.camel.util.ObjectHelper;
043    
044    /**
045     * The context used to activate new routing rules
046     *
047     * @version 
048     */
049    public class DefaultRouteContext implements RouteContext {
050        private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>();
051        private final RouteDefinition route;
052        private FromDefinition from;
053        private final Collection<Route> routes;
054        private Endpoint endpoint;
055        private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
056        private CamelContext camelContext;
057        private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
058        private InterceptStrategy managedInterceptStrategy;
059        private boolean routeAdded;
060        private Boolean trace;
061        private Boolean messageHistory;
062        private Boolean streamCache;
063        private Boolean handleFault;
064        private Long delay;
065        private Boolean autoStartup = Boolean.TRUE;
066        private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
067        private ShutdownRoute shutdownRoute;
068        private ShutdownRunningTask shutdownRunningTask;
069    
070        public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
071            this.camelContext = camelContext;
072            this.route = route;
073            this.from = from;
074            this.routes = routes;
075        }
076    
077        /**
078         * Only used for lazy construction from inside ExpressionType
079         */
080        public DefaultRouteContext(CamelContext camelContext) {
081            this.camelContext = camelContext;
082            this.routes = new ArrayList<Route>();
083            this.route = new RouteDefinition("temporary");
084        }
085    
086        public Endpoint getEndpoint() {
087            if (endpoint == null) {
088                endpoint = from.resolveEndpoint(this);
089            }
090            return endpoint;
091        }
092    
093        public FromDefinition getFrom() {
094            return from;
095        }
096    
097        public RouteDefinition getRoute() {
098            return route;
099        }
100    
101        public CamelContext getCamelContext() {
102            return camelContext;
103        }
104    
105        public Endpoint resolveEndpoint(String uri) {
106            return route.resolveEndpoint(getCamelContext(), uri);
107        }
108    
109        public Endpoint resolveEndpoint(String uri, String ref) {
110            Endpoint endpoint = null;
111            if (uri != null) {
112                endpoint = resolveEndpoint(uri);
113                if (endpoint == null) {
114                    throw new NoSuchEndpointException(uri);
115                }
116            }
117            if (ref != null) {
118                endpoint = lookup(ref, Endpoint.class);
119                if (endpoint == null) {
120                    throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
121                }
122                // Check the endpoint has the right CamelContext 
123                if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
124                    throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
125                }
126            }
127            if (endpoint == null) {
128                throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
129            } else {
130                return endpoint;
131            }
132        }
133    
134        public <T> T lookup(String name, Class<T> type) {
135            return getCamelContext().getRegistry().lookupByNameAndType(name, type);
136        }
137    
138        public <T> Map<String, T> lookupByType(Class<T> type) {
139            return getCamelContext().getRegistry().findByTypeWithName(type);
140        }
141    
142        @Override
143        public <T> T mandatoryLookup(String name, Class<T> type) {
144            return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
145        }
146    
147        public void commit() {
148            // now lets turn all of the event driven consumer processors into a single route
149            if (!eventDrivenProcessors.isEmpty()) {
150                Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
151    
152                String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());
153    
154                // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
155                CamelInternalProcessor internal = new CamelInternalProcessor(target);
156                internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
157    
158                // and then in route context so we can keep track which route this is at runtime
159                internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(this));
160    
161                // and then optionally add route policy processor if a custom policy is set
162                List<RoutePolicy> routePolicyList = getRoutePolicyList();
163                if (routePolicyList != null && !routePolicyList.isEmpty()) {
164                    for (RoutePolicy policy : routePolicyList) {
165                        // add policy as service if we have not already done that (eg possible if two routes have the same service)
166                        // this ensures Camel can control the lifecycle of the policy
167                        if (!camelContext.hasService(policy)) {
168                            try {
169                                camelContext.addService(policy);
170                            } catch (Exception e) {
171                                throw ObjectHelper.wrapRuntimeCamelException(e);
172                            }
173                        }
174                    }
175    
176                    internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList));
177                }
178    
179                // wrap in route inflight processor to track number of inflight exchanges for the route
180                internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId));
181    
182                // wrap in JMX instrumentation processor that is used for performance stats
183                internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route"));
184    
185                // and create the route that wraps the UoW
186                Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal);
187                edcr.getProperties().put(Route.ID_PROPERTY, routeId);
188                edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
189                if (route.getGroup() != null) {
190                    edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
191                }
192    
193                // after the route is created then set the route on the policy processor so we get hold of it
194                CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class);
195                if (task != null) {
196                    task.setRoute(edcr);
197                }
198    
199                // invoke init on route policy
200                if (routePolicyList != null && !routePolicyList.isEmpty()) {
201                    for (RoutePolicy policy : routePolicyList) {
202                        policy.onInit(edcr);
203                    }
204                }
205    
206                routes.add(edcr);
207            }
208        }
209    
210        public void addEventDrivenProcessor(Processor processor) {
211            eventDrivenProcessors.add(processor);
212        }
213    
214        public List<InterceptStrategy> getInterceptStrategies() {
215            return interceptStrategies;
216        }
217    
218        public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
219            this.interceptStrategies = interceptStrategies;
220        }
221    
222        public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
223            getInterceptStrategies().add(interceptStrategy);
224        }
225    
226        public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) {
227            this.managedInterceptStrategy = interceptStrategy;
228        }
229    
230        public InterceptStrategy getManagedInterceptStrategy() {
231            return managedInterceptStrategy;
232        }
233    
234        public boolean isRouteAdded() {
235            return routeAdded;
236        }
237    
238        public void setIsRouteAdded(boolean routeAdded) {
239            this.routeAdded = routeAdded;
240        }
241    
242        public void setTracing(Boolean tracing) {
243            this.trace = tracing;
244        }
245    
246        public Boolean isTracing() {
247            if (trace != null) {
248                return trace;
249            } else {
250                // fallback to the option from camel context
251                return getCamelContext().isTracing();
252            }
253        }
254    
255        public void setMessageHistory(Boolean messageHistory) {
256            this.messageHistory = messageHistory;
257        }
258    
259        public Boolean isMessageHistory() {
260            if (messageHistory != null) {
261                return messageHistory;
262            } else {
263                // fallback to the option from camel context
264                return getCamelContext().isMessageHistory();
265            }
266        }
267    
268        public void setStreamCaching(Boolean cache) {
269            this.streamCache = cache;
270        }
271    
272        public Boolean isStreamCaching() {
273            if (streamCache != null) {
274                return streamCache;
275            } else {
276                // fallback to the option from camel context
277                return getCamelContext().isStreamCaching();
278            }
279        }
280    
281        public void setHandleFault(Boolean handleFault) {
282            this.handleFault = handleFault;
283        }
284    
285        public Boolean isHandleFault() {
286            if (handleFault != null) {
287                return handleFault;
288            } else {
289                // fallback to the option from camel context
290                return getCamelContext().isHandleFault();
291            }
292        }
293    
294        public void setDelayer(Long delay) {
295            this.delay = delay;
296        }
297    
298        public Long getDelayer() {
299            if (delay != null) {
300                return delay;
301            } else {
302                // fallback to the option from camel context
303                return getCamelContext().getDelayer();
304            }
305        }
306    
307        public void setAutoStartup(Boolean autoStartup) {
308            this.autoStartup = autoStartup;
309        }
310    
311        public Boolean isAutoStartup() {
312            if (autoStartup != null) {
313                return autoStartup;
314            }
315            // default to true
316            return true;
317        }
318    
319        public void setShutdownRoute(ShutdownRoute shutdownRoute) {
320            this.shutdownRoute = shutdownRoute;
321        }
322    
323        public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) {
324            throw new IllegalArgumentException("This option can only be configured on CamelContext");
325        }
326    
327        public Boolean isAllowUseOriginalMessage() {
328            return getCamelContext().isAllowUseOriginalMessage();
329        }
330    
331        public ShutdownRoute getShutdownRoute() {
332            if (shutdownRoute != null) {
333                return shutdownRoute;
334            } else {
335                // fallback to the option from camel context
336                return getCamelContext().getShutdownRoute();
337            }
338        }
339    
340        public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
341            this.shutdownRunningTask = shutdownRunningTask;
342        }
343    
344        public ShutdownRunningTask getShutdownRunningTask() {
345            if (shutdownRunningTask != null) {
346                return shutdownRunningTask;
347            } else {
348                // fallback to the option from camel context
349                return getCamelContext().getShutdownRunningTask();
350            }
351        }
352        
353        public int getAndIncrement(ProcessorDefinition<?> node) {
354            AtomicInteger count = nodeIndex.get(node);
355            if (count == null) {
356                count = new AtomicInteger();
357                nodeIndex.put(node, count);
358            }
359            return count.getAndIncrement();
360        }
361    
362        public void setRoutePolicyList(List<RoutePolicy> routePolicyList) {
363            this.routePolicyList = routePolicyList;
364        }
365    
366        public List<RoutePolicy> getRoutePolicyList() {
367            return routePolicyList;
368        }
369    }