001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import org.apache.camel.AsyncProcessor;
020    import org.apache.camel.Consumer;
021    import org.apache.camel.Endpoint;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Route;
025    import org.apache.camel.RouteAware;
026    import org.apache.camel.spi.ExceptionHandler;
027    import org.apache.camel.spi.UnitOfWork;
028    import org.apache.camel.support.ServiceSupport;
029    import org.apache.camel.util.AsyncProcessorConverterHelper;
030    import org.apache.camel.util.ServiceHelper;
031    import org.apache.camel.util.URISupport;
032    import org.apache.camel.util.UnitOfWorkHelper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * A default consumer useful for implementation inheritance.
038     *
039     * @version 
040     */
041    public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
042        protected final Logger log = LoggerFactory.getLogger(getClass());
043        private final Endpoint endpoint;
044        private final Processor processor;
045        private volatile AsyncProcessor asyncProcessor;
046        private ExceptionHandler exceptionHandler;
047        private Route route;
048    
049        public DefaultConsumer(Endpoint endpoint, Processor processor) {
050            this.endpoint = endpoint;
051            this.processor = processor;
052            this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
053        }
054    
055        @Override
056        public String toString() {
057            return "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]";
058        }
059    
060        public Route getRoute() {
061            return route;
062        }
063    
064        public void setRoute(Route route) {
065            this.route = route;
066        }
067    
068        /**
069         * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
070         * the processed {@link Exchange} then this method should be use to create and start
071         * the {@link UnitOfWork} on the exchange.
072         *
073         * @param exchange the exchange
074         * @return the created and started unit of work
075         * @throws Exception is thrown if error starting the unit of work
076         *
077         * @see #doneUoW(org.apache.camel.Exchange)
078         */
079        public UnitOfWork createUoW(Exchange exchange) throws Exception {
080            // if the exchange doesn't have from route id set, then set it if it originated
081            // from this unit of work
082            if (route != null && exchange.getFromRouteId() == null) {
083                exchange.setFromRouteId(route.getId());
084            }
085    
086            UnitOfWork uow = endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
087            exchange.setUnitOfWork(uow);
088            uow.start();
089            return uow;
090        }
091    
092        /**
093         * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
094         * the processed {@link Exchange} then this method should be executed when the consumer
095         * is finished processing the message.
096         *
097         * @param exchange the exchange
098         *
099         * @see #createUoW(org.apache.camel.Exchange)
100         */
101        public void doneUoW(Exchange exchange) {
102            UnitOfWorkHelper.doneUow(exchange.getUnitOfWork(), exchange);
103        }
104    
105        public Endpoint getEndpoint() {
106            return endpoint;
107        }
108    
109        public Processor getProcessor() {
110            return processor;
111        }
112    
113        /**
114         * Provides an {@link org.apache.camel.AsyncProcessor} interface to the configured
115         * processor on the consumer. If the processor does not implement the interface,
116         * it will be adapted so that it does.
117         */
118        public synchronized AsyncProcessor getAsyncProcessor() {
119            if (asyncProcessor == null) {            
120                asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
121            }
122            return asyncProcessor;
123        }
124    
125        public ExceptionHandler getExceptionHandler() {
126            return exceptionHandler;
127        }
128    
129        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
130            this.exceptionHandler = exceptionHandler;
131        }
132    
133        protected void doStop() throws Exception {
134            log.debug("Stopping consumer: {}", this);
135            ServiceHelper.stopServices(processor);
136        }
137    
138        protected void doStart() throws Exception {
139            log.debug("Starting consumer: {}", this);
140            ServiceHelper.startServices(processor);
141        }
142    
143        /**
144         * Handles the given exception using the {@link #getExceptionHandler()}
145         * 
146         * @param t the exception to handle
147         */
148        protected void handleException(Throwable t) {
149            Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
150            getExceptionHandler().handleException(newt);
151        }
152    
153        /**
154         * Handles the given exception using the {@link #getExceptionHandler()}
155         *
156         * @param message additional message about the exception
157         * @param t the exception to handle
158         */
159        protected void handleException(String message, Throwable t) {
160            Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
161            getExceptionHandler().handleException(message, newt);
162        }
163    }