001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.List; 020 021 import org.apache.camel.CamelContext; 022 import org.apache.camel.ConsumerTemplate; 023 import org.apache.camel.Endpoint; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.spi.Synchronization; 026 import org.apache.camel.support.ServiceSupport; 027 import org.apache.camel.util.CamelContextHelper; 028 import org.apache.camel.util.ServiceHelper; 029 import org.apache.camel.util.UnitOfWorkHelper; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 034 035 /** 036 * Template (named like Spring's TransactionTemplate & JmsTemplate 037 * et al) for working with Camel and consuming {@link org.apache.camel.Message} instances in an 038 * {@link Exchange} from an {@link Endpoint}. 039 * 040 * @version 041 */ 042 public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerTemplate { 043 044 private static final Logger LOG = LoggerFactory.getLogger(DefaultConsumerTemplate.class); 045 private final CamelContext camelContext; 046 private ConsumerCache consumerCache; 047 private int maximumCacheSize; 048 049 public DefaultConsumerTemplate(CamelContext camelContext) { 050 this.camelContext = camelContext; 051 } 052 053 public int getMaximumCacheSize() { 054 return maximumCacheSize; 055 } 056 057 public void setMaximumCacheSize(int maximumCacheSize) { 058 this.maximumCacheSize = maximumCacheSize; 059 } 060 061 public int getCurrentCacheSize() { 062 if (consumerCache == null) { 063 return 0; 064 } 065 return consumerCache.size(); 066 } 067 068 /** 069 * @deprecated use {@link #getCamelContext()} 070 */ 071 @Deprecated 072 public CamelContext getContext() { 073 return getCamelContext(); 074 } 075 076 public CamelContext getCamelContext() { 077 return camelContext; 078 } 079 080 public Exchange receive(String endpointUri) { 081 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 082 return getConsumerCache().receive(endpoint); 083 } 084 085 public Exchange receive(Endpoint endpoint) { 086 return receive(endpoint.getEndpointUri()); 087 } 088 089 public Exchange receive(String endpointUri, long timeout) { 090 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 091 return getConsumerCache().receive(endpoint, timeout); 092 } 093 094 public Exchange receive(Endpoint endpoint, long timeout) { 095 return receive(endpoint.getEndpointUri(), timeout); 096 } 097 098 public Exchange receiveNoWait(String endpointUri) { 099 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 100 return getConsumerCache().receiveNoWait(endpoint); 101 } 102 103 public Exchange receiveNoWait(Endpoint endpoint) { 104 return receiveNoWait(endpoint.getEndpointUri()); 105 } 106 107 public Object receiveBody(String endpointUri) { 108 Object answer = null; 109 Exchange exchange = receive(endpointUri); 110 try { 111 answer = extractResultBody(exchange); 112 } finally { 113 doneUoW(exchange); 114 } 115 return answer; 116 } 117 118 public Object receiveBody(Endpoint endpoint) { 119 return receiveBody(endpoint.getEndpointUri()); 120 } 121 122 public Object receiveBody(String endpointUri, long timeout) { 123 Object answer = null; 124 Exchange exchange = receive(endpointUri, timeout); 125 try { 126 answer = extractResultBody(exchange); 127 } finally { 128 doneUoW(exchange); 129 } 130 return answer; 131 } 132 133 public Object receiveBody(Endpoint endpoint, long timeout) { 134 return receiveBody(endpoint.getEndpointUri(), timeout); 135 } 136 137 public Object receiveBodyNoWait(String endpointUri) { 138 Object answer = null; 139 Exchange exchange = receiveNoWait(endpointUri); 140 try { 141 answer = extractResultBody(exchange); 142 } finally { 143 doneUoW(exchange); 144 } 145 return answer; 146 } 147 148 public Object receiveBodyNoWait(Endpoint endpoint) { 149 return receiveBodyNoWait(endpoint.getEndpointUri()); 150 } 151 152 @SuppressWarnings("unchecked") 153 public <T> T receiveBody(String endpointUri, Class<T> type) { 154 Object answer = null; 155 Exchange exchange = receive(endpointUri); 156 try { 157 answer = extractResultBody(exchange); 158 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 159 } finally { 160 doneUoW(exchange); 161 } 162 return (T) answer; 163 } 164 165 public <T> T receiveBody(Endpoint endpoint, Class<T> type) { 166 return receiveBody(endpoint.getEndpointUri(), type); 167 } 168 169 @SuppressWarnings("unchecked") 170 public <T> T receiveBody(String endpointUri, long timeout, Class<T> type) { 171 Object answer = null; 172 Exchange exchange = receive(endpointUri, timeout); 173 try { 174 answer = extractResultBody(exchange); 175 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 176 } finally { 177 doneUoW(exchange); 178 } 179 return (T) answer; 180 } 181 182 public <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type) { 183 return receiveBody(endpoint.getEndpointUri(), timeout, type); 184 } 185 186 @SuppressWarnings("unchecked") 187 public <T> T receiveBodyNoWait(String endpointUri, Class<T> type) { 188 Object answer = null; 189 Exchange exchange = receiveNoWait(endpointUri); 190 try { 191 answer = extractResultBody(exchange); 192 answer = camelContext.getTypeConverter().convertTo(type, exchange, answer); 193 } finally { 194 doneUoW(exchange); 195 } 196 return (T) answer; 197 } 198 199 public <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type) { 200 return receiveBodyNoWait(endpoint.getEndpointUri(), type); 201 } 202 203 public void doneUoW(Exchange exchange) { 204 try { 205 // The receiveBody method will get a null exchange 206 if (exchange == null) { 207 return; 208 } 209 if (exchange.getUnitOfWork() == null) { 210 // handover completions and done them manually to ensure they are being executed 211 List<Synchronization> synchronizations = exchange.handoverCompletions(); 212 UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, LOG); 213 } else { 214 // done the unit of work 215 exchange.getUnitOfWork().done(exchange); 216 } 217 } catch (Throwable e) { 218 LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange 219 + ". This exception will be ignored.", e); 220 } 221 } 222 223 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 224 return CamelContextHelper.getMandatoryEndpoint(camelContext, endpointUri); 225 } 226 227 /** 228 * Extracts the body from the given result. 229 * <p/> 230 * If the exchange pattern is provided it will try to honor it and retrieve the body 231 * from either IN or OUT according to the pattern. 232 * 233 * @param result the result 234 * @return the result, can be <tt>null</tt>. 235 */ 236 protected Object extractResultBody(Exchange result) { 237 Object answer = null; 238 if (result != null) { 239 // rethrow if there was an exception 240 if (result.getException() != null) { 241 throw wrapRuntimeCamelException(result.getException()); 242 } 243 244 // okay no fault then return the response 245 if (result.hasOut()) { 246 // use OUT as the response 247 answer = result.getOut().getBody(); 248 } else { 249 // use IN as the response 250 answer = result.getIn().getBody(); 251 } 252 } 253 return answer; 254 } 255 256 private ConsumerCache getConsumerCache() { 257 if (!isStarted()) { 258 throw new IllegalStateException("ConsumerTemplate has not been started"); 259 } 260 return consumerCache; 261 } 262 263 protected void doStart() throws Exception { 264 if (consumerCache == null) { 265 if (maximumCacheSize > 0) { 266 consumerCache = new ConsumerCache(this, camelContext, maximumCacheSize); 267 } else { 268 consumerCache = new ConsumerCache(this, camelContext); 269 } 270 } 271 ServiceHelper.startService(consumerCache); 272 } 273 274 protected void doStop() throws Exception { 275 // we should shutdown the services as this is our intention, to not re-use the services anymore 276 ServiceHelper.stopAndShutdownService(consumerCache); 277 consumerCache = null; 278 } 279 280 }