001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.List; 022 import java.util.Map; 023 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.CamelContextAware; 027 import org.apache.camel.Channel; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Processor; 030 import org.apache.camel.model.ModelChannel; 031 import org.apache.camel.model.ProcessorDefinition; 032 import org.apache.camel.model.ProcessorDefinitionHelper; 033 import org.apache.camel.model.RouteDefinition; 034 import org.apache.camel.model.RouteDefinitionHelper; 035 import org.apache.camel.processor.CamelInternalProcessor; 036 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; 037 import org.apache.camel.processor.WrapProcessor; 038 import org.apache.camel.spi.InterceptStrategy; 039 import org.apache.camel.spi.RouteContext; 040 import org.apache.camel.util.OrderedComparator; 041 import org.apache.camel.util.ServiceHelper; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 /** 046 * DefaultChannel is the default {@link Channel}. 047 * <p/> 048 * The current implementation is just a composite containing the interceptors and error handler 049 * that beforehand was added to the route graph directly. 050 * <br/> 051 * With this {@link Channel} we can in the future implement better strategies for routing the 052 * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node 053 * in the graph. 054 * 055 * @version 056 */ 057 public class DefaultChannel extends CamelInternalProcessor implements ModelChannel { 058 059 private static final Logger LOG = LoggerFactory.getLogger(DefaultChannel.class); 060 061 private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>(); 062 private Processor errorHandler; 063 // the next processor (non wrapped) 064 private Processor nextProcessor; 065 // the real output to invoke that has been wrapped 066 private Processor output; 067 private ProcessorDefinition<?> definition; 068 private ProcessorDefinition<?> childDefinition; 069 private CamelContext camelContext; 070 private RouteContext routeContext; 071 072 public void setNextProcessor(Processor next) { 073 this.nextProcessor = next; 074 } 075 076 public Processor getOutput() { 077 // the errorHandler is already decorated with interceptors 078 // so it contain the entire chain of processors, so we can safely use it directly as output 079 // if no error handler provided we use the output 080 // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the 081 // the error handlers, interceptors, etc. woven in at design time 082 return errorHandler != null ? errorHandler : output; 083 } 084 085 @Override 086 public boolean hasNext() { 087 return nextProcessor != null; 088 } 089 090 @Override 091 public List<Processor> next() { 092 if (!hasNext()) { 093 return null; 094 } 095 List<Processor> answer = new ArrayList<Processor>(1); 096 answer.add(nextProcessor); 097 return answer; 098 } 099 100 public void setOutput(Processor output) { 101 this.output = output; 102 } 103 104 public Processor getNextProcessor() { 105 return nextProcessor; 106 } 107 108 public boolean hasInterceptorStrategy(Class<?> type) { 109 for (InterceptStrategy strategy : interceptors) { 110 if (type.isInstance(strategy)) { 111 return true; 112 } 113 } 114 return false; 115 } 116 117 public void setErrorHandler(Processor errorHandler) { 118 this.errorHandler = errorHandler; 119 } 120 121 public Processor getErrorHandler() { 122 return errorHandler; 123 } 124 125 public void addInterceptStrategy(InterceptStrategy strategy) { 126 interceptors.add(strategy); 127 } 128 129 public void addInterceptStrategies(List<InterceptStrategy> strategies) { 130 interceptors.addAll(strategies); 131 } 132 133 public List<InterceptStrategy> getInterceptStrategies() { 134 return interceptors; 135 } 136 137 public ProcessorDefinition<?> getProcessorDefinition() { 138 return definition; 139 } 140 141 public void setChildDefinition(ProcessorDefinition<?> childDefinition) { 142 this.childDefinition = childDefinition; 143 } 144 145 public RouteContext getRouteContext() { 146 return routeContext; 147 } 148 149 @Override 150 protected void doStart() throws Exception { 151 // the output has now been created, so assign the output as the processor 152 setProcessor(getOutput()); 153 ServiceHelper.startServices(errorHandler, output); 154 } 155 156 @Override 157 protected void doStop() throws Exception { 158 ServiceHelper.stopServices(output, errorHandler); 159 } 160 161 public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 162 this.routeContext = routeContext; 163 this.definition = outputDefinition; 164 this.camelContext = routeContext.getCamelContext(); 165 166 Processor target = nextProcessor; 167 Processor next; 168 169 // init CamelContextAware as early as possible on target 170 if (target instanceof CamelContextAware) { 171 ((CamelContextAware) target).setCamelContext(camelContext); 172 } 173 174 // the definition to wrap should be the fine grained, 175 // so if a child is set then use it, if not then its the original output used 176 ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition; 177 LOG.debug("Initialize channel for target: '{}'", targetOutputDef); 178 179 // fix parent/child relationship. This will be the case of the routes has been 180 // defined using XML DSL or end user may have manually assembled a route from the model. 181 // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders) 182 // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors 183 // then we need to fix the parent/child relationship beforehand, and thus we can do it here 184 // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0) 185 if (childDefinition != null && outputDefinition != childDefinition) { 186 childDefinition.setParent(outputDefinition); 187 } 188 189 // force the creation of an id 190 RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); 191 192 // first wrap the output with the managed strategy if any 193 InterceptStrategy managed = routeContext.getManagedInterceptStrategy(); 194 if (managed != null) { 195 next = target == nextProcessor ? null : nextProcessor; 196 target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 197 } 198 199 // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog) 200 InterceptStrategy tracer = getOrCreateBacklogTracer(); 201 camelContext.addService(tracer); 202 if (tracer instanceof BacklogTracer) { 203 BacklogTracer backlogTracer = (BacklogTracer) tracer; 204 205 RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); 206 boolean first = false; 207 if (route != null && !route.getOutputs().isEmpty()) { 208 first = route.getOutputs().get(0) == definition; 209 } 210 211 addAdvice(new BacklogTracerAdvice(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); 212 213 // add debugger as well so we have both tracing and debugging out of the box 214 InterceptStrategy debugger = getOrCreateBacklogDebugger(); 215 camelContext.addService(debugger); 216 if (debugger instanceof BacklogDebugger) { 217 BacklogDebugger backlogDebugger = (BacklogDebugger) debugger; 218 addAdvice(new BacklogDebuggerAdvice(backlogDebugger, target, targetOutputDef)); 219 } 220 } 221 222 if (routeContext.isMessageHistory()) { 223 // add message history advice 224 addAdvice(new MessageHistoryAdvice(targetOutputDef)); 225 } 226 227 // the regular tracer is not a task on internalProcessor as this is not really needed 228 // end users have to explicit enable the tracer to use it, and then its okay if we wrap 229 // the processors (but by default tracer is disabled, and therefore we do not wrap processors) 230 tracer = getOrCreateTracer(); 231 camelContext.addService(tracer); 232 if (tracer != null) { 233 TraceInterceptor trace = (TraceInterceptor) tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); 234 // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly 235 trace.setRouteContext(routeContext); 236 target = trace; 237 } 238 239 // sort interceptors according to ordered 240 Collections.sort(interceptors, new OrderedComparator()); 241 // then reverse list so the first will be wrapped last, as it would then be first being invoked 242 Collections.reverse(interceptors); 243 // wrap the output with the configured interceptors 244 for (InterceptStrategy strategy : interceptors) { 245 next = target == nextProcessor ? null : nextProcessor; 246 // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy 247 if (strategy instanceof Tracer) { 248 continue; 249 } 250 // skip stream caching as it must be wrapped as outer most, which we do later 251 if (strategy instanceof StreamCaching) { 252 continue; 253 } 254 // use the fine grained definition (eg the child if available). Its always possible to get back to the parent 255 Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 256 if (!(wrapped instanceof AsyncProcessor)) { 257 LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance." 258 + " This causes the asynchronous routing engine to not work as optimal as possible." 259 + " See more details at the InterceptStrategy javadoc." 260 + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," 261 + " but its not the most optimal solution. Please consider changing your interceptor to comply."); 262 263 // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway 264 // however its not the most optimal solution, but we can still run. 265 InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target); 266 wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next); 267 bridge.setTarget(wrapped); 268 wrapped = bridge; 269 } 270 if (!(wrapped instanceof WrapProcessor)) { 271 // wrap the target so it becomes a service and we can manage its lifecycle 272 wrapped = new WrapProcessor(wrapped, target); 273 } 274 target = wrapped; 275 } 276 277 if (routeContext.isStreamCaching()) { 278 addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy())); 279 } 280 281 if (routeContext.getDelayer() != null && routeContext.getDelayer() > 0) { 282 addAdvice(new DelayerAdvice(routeContext.getDelayer())); 283 } 284 285 // sets the delegate to our wrapped output 286 output = target; 287 } 288 289 @Override 290 public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 291 // noop 292 } 293 294 private InterceptStrategy getOrCreateTracer() { 295 // only use tracer if explicit enabled 296 if (camelContext.isTracing() != null && !camelContext.isTracing()) { 297 return null; 298 } 299 300 InterceptStrategy tracer = Tracer.getTracer(camelContext); 301 if (tracer == null) { 302 if (camelContext.getRegistry() != null) { 303 // lookup in registry 304 Map<String, Tracer> map = camelContext.getRegistry().findByTypeWithName(Tracer.class); 305 if (map.size() == 1) { 306 tracer = map.values().iterator().next(); 307 } 308 } 309 if (tracer == null) { 310 // fallback to use the default tracer 311 tracer = camelContext.getDefaultTracer(); 312 313 // configure and use any trace formatter if any exists 314 Map<String, TraceFormatter> formatters = camelContext.getRegistry().findByTypeWithName(TraceFormatter.class); 315 if (formatters.size() == 1) { 316 TraceFormatter formatter = formatters.values().iterator().next(); 317 if (tracer instanceof Tracer) { 318 ((Tracer) tracer).setFormatter(formatter); 319 } 320 } 321 } 322 } 323 324 return tracer; 325 } 326 327 private InterceptStrategy getOrCreateBacklogTracer() { 328 InterceptStrategy tracer = BacklogTracer.getBacklogTracer(camelContext); 329 if (tracer == null) { 330 if (camelContext.getRegistry() != null) { 331 // lookup in registry 332 Map<String, BacklogTracer> map = camelContext.getRegistry().findByTypeWithName(BacklogTracer.class); 333 if (map.size() == 1) { 334 tracer = map.values().iterator().next(); 335 } 336 } 337 if (tracer == null) { 338 // fallback to use the default tracer 339 tracer = camelContext.getDefaultBacklogTracer(); 340 } 341 } 342 343 return tracer; 344 } 345 346 private InterceptStrategy getOrCreateBacklogDebugger() { 347 InterceptStrategy debugger = BacklogDebugger.getBacklogDebugger(camelContext); 348 if (debugger == null) { 349 if (camelContext.getRegistry() != null) { 350 // lookup in registry 351 Map<String, BacklogDebugger> map = camelContext.getRegistry().findByTypeWithName(BacklogDebugger.class); 352 if (map.size() == 1) { 353 debugger = map.values().iterator().next(); 354 } 355 } 356 if (debugger == null) { 357 // fallback to use the default debugger 358 debugger = camelContext.getDefaultBacklogDebugger(); 359 } 360 } 361 362 return debugger; 363 } 364 365 @Override 366 public String toString() { 367 // just output the next processor as all the interceptors and error handler is just too verbose 368 return "Channel[" + nextProcessor + "]"; 369 } 370 371 }