001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.List;
022    import java.util.Map;
023    
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.Channel;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Processor;
030    import org.apache.camel.model.ModelChannel;
031    import org.apache.camel.model.ProcessorDefinition;
032    import org.apache.camel.model.ProcessorDefinitionHelper;
033    import org.apache.camel.model.RouteDefinition;
034    import org.apache.camel.model.RouteDefinitionHelper;
035    import org.apache.camel.processor.CamelInternalProcessor;
036    import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
037    import org.apache.camel.processor.WrapProcessor;
038    import org.apache.camel.spi.InterceptStrategy;
039    import org.apache.camel.spi.RouteContext;
040    import org.apache.camel.util.OrderedComparator;
041    import org.apache.camel.util.ServiceHelper;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * DefaultChannel is the default {@link Channel}.
047     * <p/>
048     * The current implementation is just a composite containing the interceptors and error handler
049     * that beforehand was added to the route graph directly.
050     * <br/>
051     * With this {@link Channel} we can in the future implement better strategies for routing the
052     * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
053     * in the graph.
054     *
055     * @version 
056     */
057    public class DefaultChannel extends CamelInternalProcessor implements ModelChannel {
058    
059        private static final Logger LOG = LoggerFactory.getLogger(DefaultChannel.class);
060    
061        private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
062        private Processor errorHandler;
063        // the next processor (non wrapped)
064        private Processor nextProcessor;
065        // the real output to invoke that has been wrapped
066        private Processor output;
067        private ProcessorDefinition<?> definition;
068        private ProcessorDefinition<?> childDefinition;
069        private CamelContext camelContext;
070        private RouteContext routeContext;
071    
072        public void setNextProcessor(Processor next) {
073            this.nextProcessor = next;
074        }
075    
076        public Processor getOutput() {
077            // the errorHandler is already decorated with interceptors
078            // so it contain the entire chain of processors, so we can safely use it directly as output
079            // if no error handler provided we use the output
080            // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the
081            // the error handlers, interceptors, etc. woven in at design time
082            return errorHandler != null ? errorHandler : output;
083        }
084    
085        @Override
086        public boolean hasNext() {
087            return nextProcessor != null;
088        }
089    
090        @Override
091        public List<Processor> next() {
092            if (!hasNext()) {
093                return null;
094            }
095            List<Processor> answer = new ArrayList<Processor>(1);
096            answer.add(nextProcessor);
097            return answer;
098        }
099    
100        public void setOutput(Processor output) {
101            this.output = output;
102        }
103    
104        public Processor getNextProcessor() {
105            return nextProcessor;
106        }
107    
108        public boolean hasInterceptorStrategy(Class<?> type) {
109            for (InterceptStrategy strategy : interceptors) {
110                if (type.isInstance(strategy)) {
111                    return true;
112                }
113            }
114            return false;
115        }
116    
117        public void setErrorHandler(Processor errorHandler) {
118            this.errorHandler = errorHandler;
119        }
120    
121        public Processor getErrorHandler() {
122            return errorHandler;
123        }
124    
125        public void addInterceptStrategy(InterceptStrategy strategy) {
126            interceptors.add(strategy);
127        }
128    
129        public void addInterceptStrategies(List<InterceptStrategy> strategies) {
130            interceptors.addAll(strategies);
131        }
132    
133        public List<InterceptStrategy> getInterceptStrategies() {
134            return interceptors;
135        }
136    
137        public ProcessorDefinition<?> getProcessorDefinition() {
138            return definition;
139        }
140    
141        public void setChildDefinition(ProcessorDefinition<?> childDefinition) {
142            this.childDefinition = childDefinition;
143        }
144    
145        public RouteContext getRouteContext() {
146            return routeContext;
147        }
148    
149        @Override
150        protected void doStart() throws Exception {
151            // the output has now been created, so assign the output as the processor
152            setProcessor(getOutput());
153            ServiceHelper.startServices(errorHandler, output);
154        }
155    
156        @Override
157        protected void doStop() throws Exception {
158            ServiceHelper.stopServices(output, errorHandler);
159        }
160    
161        public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
162            this.routeContext = routeContext;
163            this.definition = outputDefinition;
164            this.camelContext = routeContext.getCamelContext();
165    
166            Processor target = nextProcessor;
167            Processor next;
168    
169            // init CamelContextAware as early as possible on target
170            if (target instanceof CamelContextAware) {
171                ((CamelContextAware) target).setCamelContext(camelContext);
172            }
173    
174            // the definition to wrap should be the fine grained,
175            // so if a child is set then use it, if not then its the original output used
176            ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition;
177            LOG.debug("Initialize channel for target: '{}'", targetOutputDef);
178    
179            // fix parent/child relationship. This will be the case of the routes has been
180            // defined using XML DSL or end user may have manually assembled a route from the model.
181            // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders)
182            // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors
183            // then we need to fix the parent/child relationship beforehand, and thus we can do it here
184            // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0)
185            if (childDefinition != null && outputDefinition != childDefinition) {
186                childDefinition.setParent(outputDefinition);
187            }
188    
189            // force the creation of an id
190            RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition);
191    
192            // first wrap the output with the managed strategy if any
193            InterceptStrategy managed = routeContext.getManagedInterceptStrategy();
194            if (managed != null) {
195                next = target == nextProcessor ? null : nextProcessor;
196                target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
197            }
198    
199            // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog)
200            InterceptStrategy tracer = getOrCreateBacklogTracer();
201            camelContext.addService(tracer);
202            if (tracer instanceof BacklogTracer) {
203                BacklogTracer backlogTracer = (BacklogTracer) tracer;
204    
205                RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition);
206                boolean first = false;
207                if (route != null && !route.getOutputs().isEmpty()) {
208                    first = route.getOutputs().get(0) == definition;
209                }
210    
211                addAdvice(new BacklogTracerAdvice(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first));
212    
213                // add debugger as well so we have both tracing and debugging out of the box
214                InterceptStrategy debugger = getOrCreateBacklogDebugger();
215                camelContext.addService(debugger);
216                if (debugger instanceof BacklogDebugger) {
217                    BacklogDebugger backlogDebugger = (BacklogDebugger) debugger;
218                    addAdvice(new BacklogDebuggerAdvice(backlogDebugger, target, targetOutputDef));
219                }
220            }
221    
222            if (routeContext.isMessageHistory()) {
223                // add message history advice
224                addAdvice(new MessageHistoryAdvice(targetOutputDef));
225            }
226    
227            // the regular tracer is not a task on internalProcessor as this is not really needed
228            // end users have to explicit enable the tracer to use it, and then its okay if we wrap
229            // the processors (but by default tracer is disabled, and therefore we do not wrap processors)
230            tracer = getOrCreateTracer();
231            camelContext.addService(tracer);
232            if (tracer != null) {
233                TraceInterceptor trace = (TraceInterceptor) tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null);
234                // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly
235                trace.setRouteContext(routeContext);
236                target = trace;
237            }
238    
239            // sort interceptors according to ordered
240            Collections.sort(interceptors, new OrderedComparator());
241            // then reverse list so the first will be wrapped last, as it would then be first being invoked
242            Collections.reverse(interceptors);
243            // wrap the output with the configured interceptors
244            for (InterceptStrategy strategy : interceptors) {
245                next = target == nextProcessor ? null : nextProcessor;
246                // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy
247                if (strategy instanceof Tracer) {
248                    continue;
249                }
250                // skip stream caching as it must be wrapped as outer most, which we do later
251                if (strategy instanceof StreamCaching) {
252                    continue;
253                }
254                // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
255                Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
256                if (!(wrapped instanceof AsyncProcessor)) {
257                    LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
258                            + " This causes the asynchronous routing engine to not work as optimal as possible."
259                            + " See more details at the InterceptStrategy javadoc."
260                            + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
261                            + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
262    
263                    // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
264                    // however its not the most optimal solution, but we can still run.
265                    InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
266                    wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next);
267                    bridge.setTarget(wrapped);
268                    wrapped = bridge;
269                }
270                if (!(wrapped instanceof WrapProcessor)) {
271                    // wrap the target so it becomes a service and we can manage its lifecycle
272                    wrapped = new WrapProcessor(wrapped, target);
273                }
274                target = wrapped;
275            }
276    
277            if (routeContext.isStreamCaching()) {
278                addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy()));
279            }
280    
281            if (routeContext.getDelayer() != null && routeContext.getDelayer() > 0) {
282                addAdvice(new DelayerAdvice(routeContext.getDelayer()));
283            }
284    
285            // sets the delegate to our wrapped output
286            output = target;
287        }
288    
289        @Override
290        public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
291            // noop
292        }
293    
294        private InterceptStrategy getOrCreateTracer() {
295            // only use tracer if explicit enabled
296            if (camelContext.isTracing() != null && !camelContext.isTracing()) {
297                return null;
298            }
299    
300            InterceptStrategy tracer = Tracer.getTracer(camelContext);
301            if (tracer == null) {
302                if (camelContext.getRegistry() != null) {
303                    // lookup in registry
304                    Map<String, Tracer> map = camelContext.getRegistry().findByTypeWithName(Tracer.class);
305                    if (map.size() == 1) {
306                        tracer = map.values().iterator().next();
307                    }
308                }
309                if (tracer == null) {
310                    // fallback to use the default tracer
311                    tracer = camelContext.getDefaultTracer();
312    
313                    // configure and use any trace formatter if any exists
314                    Map<String, TraceFormatter> formatters = camelContext.getRegistry().findByTypeWithName(TraceFormatter.class);
315                    if (formatters.size() == 1) {
316                        TraceFormatter formatter = formatters.values().iterator().next();
317                        if (tracer instanceof Tracer) {
318                            ((Tracer) tracer).setFormatter(formatter);
319                        }
320                    }
321                }
322            }
323    
324            return tracer;
325        }
326    
327        private InterceptStrategy getOrCreateBacklogTracer() {
328            InterceptStrategy tracer = BacklogTracer.getBacklogTracer(camelContext);
329            if (tracer == null) {
330                if (camelContext.getRegistry() != null) {
331                    // lookup in registry
332                    Map<String, BacklogTracer> map = camelContext.getRegistry().findByTypeWithName(BacklogTracer.class);
333                    if (map.size() == 1) {
334                        tracer = map.values().iterator().next();
335                    }
336                }
337                if (tracer == null) {
338                    // fallback to use the default tracer
339                    tracer = camelContext.getDefaultBacklogTracer();
340                }
341            }
342    
343            return tracer;
344        }
345    
346        private InterceptStrategy getOrCreateBacklogDebugger() {
347            InterceptStrategy debugger = BacklogDebugger.getBacklogDebugger(camelContext);
348            if (debugger == null) {
349                if (camelContext.getRegistry() != null) {
350                    // lookup in registry
351                    Map<String, BacklogDebugger> map = camelContext.getRegistry().findByTypeWithName(BacklogDebugger.class);
352                    if (map.size() == 1) {
353                        debugger = map.values().iterator().next();
354                    }
355                }
356                if (debugger == null) {
357                    // fallback to use the default debugger
358                    debugger = camelContext.getDefaultBacklogDebugger();
359                }
360            }
361    
362            return debugger;
363        }
364    
365        @Override
366        public String toString() {
367            // just output the next processor as all the interceptors and error handler is just too verbose
368            return "Channel[" + nextProcessor + "]";
369        }
370    
371    }