001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.List;
020    
021    import org.apache.camel.CamelContext;
022    import org.apache.camel.ConsumerTemplate;
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.spi.Synchronization;
026    import org.apache.camel.support.ServiceSupport;
027    import org.apache.camel.util.CamelContextHelper;
028    import org.apache.camel.util.ServiceHelper;
029    import org.apache.camel.util.UnitOfWorkHelper;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
034    
035    /**
036     * Template (named like Spring's TransactionTemplate & JmsTemplate
037     * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an
038     * {@link Exchange} from an {@link Endpoint}.
039     *
040     * @version 
041     */
042    public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class);
045        private final CamelContext camelContext;
046        private ConsumerCache consumerCache;
047        private int maximumCacheSize;
048    
049        public DefaultConsumerTemplate(CamelContext camelContext) {
050            this.camelContext = camelContext;
051        }
052    
053        public int getMaximumCacheSize() {
054            return maximumCacheSize;
055        }
056    
057        public void setMaximumCacheSize(int maximumCacheSize) {
058            this.maximumCacheSize = maximumCacheSize;
059        }
060    
061        public int getCurrentCacheSize() {
062            if (consumerCache == null) {
063                return 0;
064            }
065            return consumerCache.size();
066        }
067    
068        /**
069         * @deprecated use {@link #getCamelContext()}
070         */
071        @Deprecated
072        public CamelContext getContext() {
073            return getCamelContext();
074        }
075    
076        public CamelContext getCamelContext() {
077            return camelContext;
078        }
079    
080        public Exchange receive(String endpointUri) {
081            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
082            return getConsumerCache().receive(endpoint);
083        }
084    
085        public Exchange receive(Endpoint endpoint) {
086            return receive(endpoint.getEndpointUri());
087        }
088    
089        public Exchange receive(String endpointUri, long timeout) {
090            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
091            return getConsumerCache().receive(endpoint, timeout);
092        }
093    
094        public Exchange receive(Endpoint endpoint, long timeout) {
095            return receive(endpoint.getEndpointUri(), timeout);
096        }
097    
098        public Exchange receiveNoWait(String endpointUri) {
099            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
100            return getConsumerCache().receiveNoWait(endpoint);
101        }
102    
103        public Exchange receiveNoWait(Endpoint endpoint) {
104            return receiveNoWait(endpoint.getEndpointUri());
105        }
106    
107        public Object receiveBody(String endpointUri) {
108            Object answer = null;
109            Exchange exchange = receive(endpointUri);
110            try {
111                answer = extractResultBody(exchange);
112            } finally {
113                doneUoW(exchange);
114            }
115            return answer;
116        }
117    
118        public Object receiveBody(Endpoint endpoint) {
119            return receiveBody(endpoint.getEndpointUri());
120        }
121    
122        public Object receiveBody(String endpointUri, long timeout) {
123            Object answer = null;
124            Exchange exchange = receive(endpointUri, timeout);
125            try {
126                answer = extractResultBody(exchange);
127            } finally {
128                doneUoW(exchange);
129            }
130            return answer;
131        }
132    
133        public Object receiveBody(Endpoint endpoint, long timeout) {
134            return receiveBody(endpoint.getEndpointUri(), timeout);
135        }
136    
137        public Object receiveBodyNoWait(String endpointUri) {
138            Object answer = null;
139            Exchange exchange = receiveNoWait(endpointUri);
140            try {
141                answer = extractResultBody(exchange);
142            } finally {
143                doneUoW(exchange);
144            }
145            return answer;
146        }
147    
148        public Object receiveBodyNoWait(Endpoint endpoint) {
149            return receiveBodyNoWait(endpoint.getEndpointUri());
150        }
151    
152        @SuppressWarnings("unchecked")
153        public <T> T receiveBody(String endpointUri, Class<T> type) {
154            Object answer = null;
155            Exchange exchange = receive(endpointUri);
156            try {
157                answer = extractResultBody(exchange);
158                answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
159            } finally {
160                doneUoW(exchange);
161            }
162            return (T) answer;
163        }
164    
165        public <T> T receiveBody(Endpoint endpoint, Class<T> type) {
166            return receiveBody(endpoint.getEndpointUri(), type);
167        }
168    
169        @SuppressWarnings("unchecked")
170        public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) {
171            Object answer = null;
172            Exchange exchange = receive(endpointUri, timeout);
173            try {
174                answer = extractResultBody(exchange);
175                answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
176            } finally {
177                doneUoW(exchange);
178            }
179            return (T) answer;
180        }
181    
182        public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) {
183            return receiveBody(endpoint.getEndpointUri(), timeout, type);
184        }
185    
186        @SuppressWarnings("unchecked")
187        public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) {
188            Object answer = null;
189            Exchange exchange = receiveNoWait(endpointUri);
190            try {
191                answer = extractResultBody(exchange);
192                answer = camelContext.getTypeConverter().convertTo(type, exchange, answer);
193            } finally {
194                doneUoW(exchange);
195            }
196            return (T) answer;
197        }
198    
199        public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) {
200            return receiveBodyNoWait(endpoint.getEndpointUri(), type);
201        }
202    
203        public void doneUoW(Exchange exchange) {
204            try {
205                // The receiveBody method will get a null exchange
206                if (exchange == null) {
207                    return;
208                }
209                if (exchange.getUnitOfWork() == null) {
210                    // handover completions and done them manually to ensure they are being executed
211                    List<Synchronization> synchronizations = exchange.handoverCompletions();
212                    UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG);
213                } else {
214                    // done the unit of work
215                    exchange.getUnitOfWork().done(exchange);
216                }
217            } catch (Throwable e) {
218                LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
219                        + ". This exception will be ignored.", e);
220            }
221        }
222    
223        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
224            return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri);
225        }
226    
227        /**
228         * Extracts the body from the given result.
229         * <p/>
230         * If the exchange pattern is provided it will try to honor it and retrieve the body
231         * from either IN or OUT according to the pattern.
232         *
233         * @param result   the result
234         * @return  the result, can be <tt>null</tt>.
235         */
236        protected Object extractResultBody(Exchange result) {
237            Object answer = null;
238            if (result != null) {
239                // rethrow if there was an exception
240                if (result.getException() != null) {
241                    throw wrapRuntimeCamelException(result.getException());
242                }
243    
244                // okay no fault then return the response
245                if (result.hasOut()) {
246                    // use OUT as the response
247                    answer = result.getOut().getBody();
248                } else {
249                    // use IN as the response
250                    answer = result.getIn().getBody();
251                }
252            }
253            return answer;
254        }
255    
256        private ConsumerCache getConsumerCache() {
257            if (!isStarted()) {
258                throw new IllegalStateException("ConsumerTemplate has not been started");
259            }
260            return consumerCache;
261        }
262    
263        protected void doStart() throws Exception {
264            if (consumerCache == null) {
265                if (maximumCacheSize > 0) {
266                    consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize);
267                } else {
268                    consumerCache = new ConsumerCache(this, camelContext);
269                }
270            }
271            ServiceHelper.startService(consumerCache);
272        }
273    
274        protected void doStop() throws Exception {
275            // we should shutdown the services as this is our intention, to not re-use the services anymore
276            ServiceHelper.stopAndShutdownService(consumerCache);
277            consumerCache = null;
278        }
279    
280    }