001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.NoSuchEndpointException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Route; 031 import org.apache.camel.ShutdownRoute; 032 import org.apache.camel.ShutdownRunningTask; 033 import org.apache.camel.model.FromDefinition; 034 import org.apache.camel.model.ProcessorDefinition; 035 import org.apache.camel.model.RouteDefinition; 036 import org.apache.camel.processor.CamelInternalProcessor; 037 import org.apache.camel.processor.Pipeline; 038 import org.apache.camel.spi.InterceptStrategy; 039 import org.apache.camel.spi.RouteContext; 040 import org.apache.camel.spi.RoutePolicy; 041 import org.apache.camel.util.CamelContextHelper; 042 import org.apache.camel.util.ObjectHelper; 043 044 /** 045 * The context used to activate new routing rules 046 * 047 * @version 048 */ 049 public class DefaultRouteContext implements RouteContext { 050 private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>(); 051 private final RouteDefinition route; 052 private FromDefinition from; 053 private final Collection<Route> routes; 054 private Endpoint endpoint; 055 private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>(); 056 private CamelContext camelContext; 057 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 058 private InterceptStrategy managedInterceptStrategy; 059 private boolean routeAdded; 060 private Boolean trace; 061 private Boolean messageHistory; 062 private Boolean streamCache; 063 private Boolean handleFault; 064 private Long delay; 065 private Boolean autoStartup = Boolean.TRUE; 066 private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>(); 067 private ShutdownRoute shutdownRoute; 068 private ShutdownRunningTask shutdownRunningTask; 069 070 public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) { 071 this.camelContext = camelContext; 072 this.route = route; 073 this.from = from; 074 this.routes = routes; 075 } 076 077 /** 078 * Only used for lazy construction from inside ExpressionType 079 */ 080 public DefaultRouteContext(CamelContext camelContext) { 081 this.camelContext = camelContext; 082 this.routes = new ArrayList<Route>(); 083 this.route = new RouteDefinition("temporary"); 084 } 085 086 public Endpoint getEndpoint() { 087 if (endpoint == null) { 088 endpoint = from.resolveEndpoint(this); 089 } 090 return endpoint; 091 } 092 093 public FromDefinition getFrom() { 094 return from; 095 } 096 097 public RouteDefinition getRoute() { 098 return route; 099 } 100 101 public CamelContext getCamelContext() { 102 return camelContext; 103 } 104 105 public Endpoint resolveEndpoint(String uri) { 106 return route.resolveEndpoint(getCamelContext(), uri); 107 } 108 109 public Endpoint resolveEndpoint(String uri, String ref) { 110 Endpoint endpoint = null; 111 if (uri != null) { 112 endpoint = resolveEndpoint(uri); 113 if (endpoint == null) { 114 throw new NoSuchEndpointException(uri); 115 } 116 } 117 if (ref != null) { 118 endpoint = lookup(ref, Endpoint.class); 119 if (endpoint == null) { 120 throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); 121 } 122 // Check the endpoint has the right CamelContext 123 if (!this.getCamelContext().equals(endpoint.getCamelContext())) { 124 throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); 125 } 126 } 127 if (endpoint == null) { 128 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); 129 } else { 130 return endpoint; 131 } 132 } 133 134 public <T> T lookup(String name, Class<T> type) { 135 return getCamelContext().getRegistry().lookupByNameAndType(name, type); 136 } 137 138 public <T> Map<String, T> lookupByType(Class<T> type) { 139 return getCamelContext().getRegistry().findByTypeWithName(type); 140 } 141 142 @Override 143 public <T> T mandatoryLookup(String name, Class<T> type) { 144 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 145 } 146 147 public void commit() { 148 // now lets turn all of the event driven consumer processors into a single route 149 if (!eventDrivenProcessors.isEmpty()) { 150 Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); 151 152 String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); 153 154 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 155 CamelInternalProcessor internal = new CamelInternalProcessor(target); 156 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 157 158 // and then in route context so we can keep track which route this is at runtime 159 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(this)); 160 161 // and then optionally add route policy processor if a custom policy is set 162 List<RoutePolicy> routePolicyList = getRoutePolicyList(); 163 if (routePolicyList != null && !routePolicyList.isEmpty()) { 164 for (RoutePolicy policy : routePolicyList) { 165 // add policy as service if we have not already done that (eg possible if two routes have the same service) 166 // this ensures Camel can control the lifecycle of the policy 167 if (!camelContext.hasService(policy)) { 168 try { 169 camelContext.addService(policy); 170 } catch (Exception e) { 171 throw ObjectHelper.wrapRuntimeCamelException(e); 172 } 173 } 174 } 175 176 internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList)); 177 } 178 179 // wrap in route inflight processor to track number of inflight exchanges for the route 180 internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId)); 181 182 // wrap in JMX instrumentation processor that is used for performance stats 183 internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route")); 184 185 // and create the route that wraps the UoW 186 Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal); 187 edcr.getProperties().put(Route.ID_PROPERTY, routeId); 188 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); 189 if (route.getGroup() != null) { 190 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup()); 191 } 192 193 // after the route is created then set the route on the policy processor so we get hold of it 194 CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class); 195 if (task != null) { 196 task.setRoute(edcr); 197 } 198 199 // invoke init on route policy 200 if (routePolicyList != null && !routePolicyList.isEmpty()) { 201 for (RoutePolicy policy : routePolicyList) { 202 policy.onInit(edcr); 203 } 204 } 205 206 routes.add(edcr); 207 } 208 } 209 210 public void addEventDrivenProcessor(Processor processor) { 211 eventDrivenProcessors.add(processor); 212 } 213 214 public List<InterceptStrategy> getInterceptStrategies() { 215 return interceptStrategies; 216 } 217 218 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 219 this.interceptStrategies = interceptStrategies; 220 } 221 222 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 223 getInterceptStrategies().add(interceptStrategy); 224 } 225 226 public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) { 227 this.managedInterceptStrategy = interceptStrategy; 228 } 229 230 public InterceptStrategy getManagedInterceptStrategy() { 231 return managedInterceptStrategy; 232 } 233 234 public boolean isRouteAdded() { 235 return routeAdded; 236 } 237 238 public void setIsRouteAdded(boolean routeAdded) { 239 this.routeAdded = routeAdded; 240 } 241 242 public void setTracing(Boolean tracing) { 243 this.trace = tracing; 244 } 245 246 public Boolean isTracing() { 247 if (trace != null) { 248 return trace; 249 } else { 250 // fallback to the option from camel context 251 return getCamelContext().isTracing(); 252 } 253 } 254 255 public void setMessageHistory(Boolean messageHistory) { 256 this.messageHistory = messageHistory; 257 } 258 259 public Boolean isMessageHistory() { 260 if (messageHistory != null) { 261 return messageHistory; 262 } else { 263 // fallback to the option from camel context 264 return getCamelContext().isMessageHistory(); 265 } 266 } 267 268 public void setStreamCaching(Boolean cache) { 269 this.streamCache = cache; 270 } 271 272 public Boolean isStreamCaching() { 273 if (streamCache != null) { 274 return streamCache; 275 } else { 276 // fallback to the option from camel context 277 return getCamelContext().isStreamCaching(); 278 } 279 } 280 281 public void setHandleFault(Boolean handleFault) { 282 this.handleFault = handleFault; 283 } 284 285 public Boolean isHandleFault() { 286 if (handleFault != null) { 287 return handleFault; 288 } else { 289 // fallback to the option from camel context 290 return getCamelContext().isHandleFault(); 291 } 292 } 293 294 public void setDelayer(Long delay) { 295 this.delay = delay; 296 } 297 298 public Long getDelayer() { 299 if (delay != null) { 300 return delay; 301 } else { 302 // fallback to the option from camel context 303 return getCamelContext().getDelayer(); 304 } 305 } 306 307 public void setAutoStartup(Boolean autoStartup) { 308 this.autoStartup = autoStartup; 309 } 310 311 public Boolean isAutoStartup() { 312 if (autoStartup != null) { 313 return autoStartup; 314 } 315 // default to true 316 return true; 317 } 318 319 public void setShutdownRoute(ShutdownRoute shutdownRoute) { 320 this.shutdownRoute = shutdownRoute; 321 } 322 323 public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) { 324 throw new IllegalArgumentException("This option can only be configured on CamelContext"); 325 } 326 327 public Boolean isAllowUseOriginalMessage() { 328 return getCamelContext().isAllowUseOriginalMessage(); 329 } 330 331 public ShutdownRoute getShutdownRoute() { 332 if (shutdownRoute != null) { 333 return shutdownRoute; 334 } else { 335 // fallback to the option from camel context 336 return getCamelContext().getShutdownRoute(); 337 } 338 } 339 340 public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { 341 this.shutdownRunningTask = shutdownRunningTask; 342 } 343 344 public ShutdownRunningTask getShutdownRunningTask() { 345 if (shutdownRunningTask != null) { 346 return shutdownRunningTask; 347 } else { 348 // fallback to the option from camel context 349 return getCamelContext().getShutdownRunningTask(); 350 } 351 } 352 353 public int getAndIncrement(ProcessorDefinition<?> node) { 354 AtomicInteger count = nodeIndex.get(node); 355 if (count == null) { 356 count = new AtomicInteger(); 357 nodeIndex.put(node, count); 358 } 359 return count.getAndIncrement(); 360 } 361 362 public void setRoutePolicyList(List<RoutePolicy> routePolicyList) { 363 this.routePolicyList = routePolicyList; 364 } 365 366 public List<RoutePolicy> getRoutePolicyList() { 367 return routePolicyList; 368 } 369 }