001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.HashMap; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.Producer; 029 import org.apache.camel.ServicePoolAware; 030 import org.apache.camel.Traceable; 031 import org.apache.camel.impl.InterceptSendToEndpoint; 032 import org.apache.camel.impl.ProducerCache; 033 import org.apache.camel.support.ServiceSupport; 034 import org.apache.camel.util.AsyncProcessorConverterHelper; 035 import org.apache.camel.util.AsyncProcessorHelper; 036 import org.apache.camel.util.EventHelper; 037 import org.apache.camel.util.ObjectHelper; 038 import org.apache.camel.util.ServiceHelper; 039 import org.apache.camel.util.StopWatch; 040 import org.apache.camel.util.URISupport; 041 import org.slf4j.Logger; 042 import org.slf4j.LoggerFactory; 043 044 /** 045 * Processor for forwarding exchanges to an endpoint destination. 046 * 047 * @version 048 */ 049 public class SendProcessor extends ServiceSupport implements AsyncProcessor, Traceable { 050 protected static final Logger LOG = LoggerFactory.getLogger(SendProcessor.class); 051 protected final CamelContext camelContext; 052 protected final ExchangePattern pattern; 053 protected ProducerCache producerCache; 054 protected AsyncProcessor producer; 055 protected Endpoint destination; 056 057 public SendProcessor(Endpoint destination) { 058 this(destination, null); 059 } 060 061 public SendProcessor(Endpoint destination, ExchangePattern pattern) { 062 ObjectHelper.notNull(destination, "destination"); 063 this.destination = destination; 064 this.camelContext = destination.getCamelContext(); 065 this.pattern = pattern; 066 ObjectHelper.notNull(this.camelContext, "camelContext"); 067 } 068 069 @Override 070 public String toString() { 071 return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")"; 072 } 073 074 /** 075 * @deprecated not longer supported. 076 */ 077 @Deprecated 078 public void setDestination(Endpoint destination) { 079 } 080 081 public String getTraceLabel() { 082 return URISupport.sanitizeUri(destination.getEndpointUri()); 083 } 084 085 public void process(final Exchange exchange) throws Exception { 086 AsyncProcessorHelper.process(this, exchange); 087 } 088 089 public boolean process(Exchange exchange, final AsyncCallback callback) { 090 if (!isStarted()) { 091 exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); 092 callback.done(true); 093 return true; 094 } 095 096 // we should preserve existing MEP so remember old MEP 097 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 098 final ExchangePattern existingPattern = exchange.getPattern(); 099 100 // if we have a producer then use that as its optimized 101 if (producer != null) { 102 103 // record timing for sending the exchange using the producer 104 final StopWatch watch = new StopWatch(); 105 106 final Exchange target = configureExchange(exchange, pattern); 107 108 EventHelper.notifyExchangeSending(exchange.getContext(), target, destination); 109 LOG.debug(">>>> {} {}", destination, exchange); 110 111 boolean sync = true; 112 try { 113 sync = producer.process(exchange, new AsyncCallback() { 114 @Override 115 public void done(boolean doneSync) { 116 try { 117 // restore previous MEP 118 target.setPattern(existingPattern); 119 // emit event that the exchange was sent to the endpoint 120 long timeTaken = watch.stop(); 121 EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken); 122 } finally { 123 callback.done(doneSync); 124 } 125 } 126 }); 127 } catch (Throwable throwable) { 128 if (exchange != null) { 129 exchange.setException(throwable); 130 } 131 132 } 133 134 return sync; 135 } 136 137 // send the exchange to the destination using the producer cache for the non optimized producers 138 return producerCache.doInAsyncProducer(destination, exchange, pattern, callback, new AsyncProducerCallback() { 139 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 140 ExchangePattern pattern, final AsyncCallback callback) { 141 final Exchange target = configureExchange(exchange, pattern); 142 LOG.debug(">>>> {} {}", destination, exchange); 143 return asyncProducer.process(target, new AsyncCallback() { 144 public void done(boolean doneSync) { 145 // restore previous MEP 146 target.setPattern(existingPattern); 147 // signal we are done 148 callback.done(doneSync); 149 } 150 }); 151 } 152 }); 153 } 154 155 public Endpoint getDestination() { 156 return destination; 157 } 158 159 public ExchangePattern getPattern() { 160 return pattern; 161 } 162 163 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) { 164 if (pattern != null) { 165 exchange.setPattern(pattern); 166 } 167 // set property which endpoint we send to 168 exchange.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); 169 return exchange; 170 } 171 172 protected void doStart() throws Exception { 173 if (producerCache == null) { 174 // use a single producer cache as we need to only hold reference for one destination 175 // and use a regular HashMap as we do not want a soft reference store that may get re-claimed when low on memory 176 // as we want to ensure the producer is kept around, to ensure its lifecycle is fully managed, 177 // eg stopping the producer when we stop etc. 178 producerCache = new ProducerCache(this, camelContext, new HashMap<String, Producer>(1)); 179 // do not add as service as we do not want to manage the producer cache 180 } 181 ServiceHelper.startService(producerCache); 182 183 // the destination could since have been intercepted by a interceptSendToEndpoint so we got to 184 // lookup this before we can use the destination 185 Endpoint lookup = camelContext.hasEndpoint(destination.getEndpointKey()); 186 if (lookup instanceof InterceptSendToEndpoint) { 187 if (LOG.isDebugEnabled()) { 188 LOG.debug("Intercepted sending to {} -> {}", 189 URISupport.sanitizeUri(destination.getEndpointUri()), URISupport.sanitizeUri(lookup.getEndpointUri())); 190 } 191 destination = lookup; 192 } 193 // warm up the producer by starting it so we can fail fast if there was a problem 194 // however must start endpoint first 195 ServiceHelper.startService(destination); 196 197 // this SendProcessor is used a lot in Camel (eg every .to in the route DSL) and therefore we 198 // want to optimize for regular producers, by using the producer directly instead of the ProducerCache 199 // Only for pooled and non singleton producers we have to use the ProducerCache as it supports these 200 // kind of producer better (though these kind of producer should be rare) 201 202 Producer producer = producerCache.acquireProducer(destination); 203 if (producer instanceof ServicePoolAware || !producer.isSingleton()) { 204 // no we cannot optimize it - so release the producer back to the producer cache 205 // and use the producer cache for sending 206 producerCache.releaseProducer(destination, producer); 207 } else { 208 // yes we can optimize and use the producer directly for sending 209 this.producer = AsyncProcessorConverterHelper.convert(producer); 210 } 211 } 212 213 protected void doStop() throws Exception { 214 ServiceHelper.stopServices(producerCache, producer); 215 } 216 217 protected void doShutdown() throws Exception { 218 ServiceHelper.stopAndShutdownServices(producerCache, producer); 219 } 220 }