001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.Consumer; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.EndpointConfiguration; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.ExchangePattern; 029 import org.apache.camel.PollingConsumer; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Producer; 032 import org.apache.camel.util.ServiceHelper; 033 import org.slf4j.Logger; 034 import org.slf4j.LoggerFactory; 035 036 import static org.apache.camel.processor.PipelineHelper.continueProcessing; 037 038 /** 039 * This is an endpoint when sending to it, is intercepted and is routed in a detour 040 * 041 * @version 042 */ 043 public class InterceptSendToEndpoint implements Endpoint { 044 045 private static final Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpoint.class); 046 047 private final Endpoint delegate; 048 private Producer producer; 049 private Processor detour; 050 private boolean skip; 051 052 /** 053 * Intercepts sending to the given endpoint 054 * 055 * @param destination the original endpoint 056 * @param skip <tt>true</tt> to skip sending after the detour to the original endpoint 057 */ 058 public InterceptSendToEndpoint(final Endpoint destination, boolean skip) { 059 this.delegate = destination; 060 this.skip = skip; 061 } 062 063 public void setDetour(Processor detour) { 064 this.detour = detour; 065 } 066 067 public Endpoint getDelegate() { 068 return delegate; 069 } 070 071 public String getEndpointUri() { 072 return delegate.getEndpointUri(); 073 } 074 075 public EndpointConfiguration getEndpointConfiguration() { 076 return delegate.getEndpointConfiguration(); 077 } 078 079 public String getEndpointKey() { 080 return delegate.getEndpointKey(); 081 } 082 083 public Exchange createExchange() { 084 return delegate.createExchange(); 085 } 086 087 public Exchange createExchange(ExchangePattern pattern) { 088 return delegate.createExchange(pattern); 089 } 090 091 public Exchange createExchange(Exchange exchange) { 092 return delegate.createExchange(exchange); 093 } 094 095 public CamelContext getCamelContext() { 096 return delegate.getCamelContext(); 097 } 098 099 public Producer createProducer() throws Exception { 100 producer = delegate.createProducer(); 101 return new DefaultAsyncProducer(delegate) { 102 103 public Endpoint getEndpoint() { 104 return producer.getEndpoint(); 105 } 106 107 public Exchange createExchange() { 108 return producer.createExchange(); 109 } 110 111 public Exchange createExchange(ExchangePattern pattern) { 112 return producer.createExchange(pattern); 113 } 114 115 public Exchange createExchange(Exchange exchange) { 116 return producer.createExchange(exchange); 117 } 118 119 @Override 120 public boolean process(Exchange exchange, AsyncCallback callback) { 121 // process the detour so we do the detour routing 122 if (LOG.isDebugEnabled()) { 123 LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, exchange}); 124 } 125 // add header with the real endpoint uri 126 exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); 127 128 // detour the exchange using synchronous processing 129 try { 130 detour.process(exchange); 131 } catch (Exception e) { 132 exchange.setException(e); 133 callback.done(true); 134 return true; 135 } 136 137 // Decide whether to continue or not; similar logic to the Pipeline 138 // check for error if so we should break out 139 if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) { 140 callback.done(true); 141 return true; 142 } 143 144 // determine if we should skip or not 145 boolean shouldSkip = skip; 146 147 // if then interceptor had a when predicate, then we should only skip if it matched 148 Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); 149 if (whenMatches != null) { 150 shouldSkip = skip && whenMatches; 151 } 152 153 if (!shouldSkip) { 154 if (exchange.hasOut()) { 155 // replace OUT with IN as detour changed something 156 exchange.setIn(exchange.getOut()); 157 exchange.setOut(null); 158 } 159 160 // route to original destination leveraging the asynchronous routing engine if possible 161 if (producer instanceof AsyncProcessor) { 162 AsyncProcessor async = (AsyncProcessor) producer; 163 return async.process(exchange, callback); 164 } else { 165 try { 166 producer.process(exchange); 167 } catch (Exception e) { 168 exchange.setException(e); 169 } 170 callback.done(true); 171 return true; 172 } 173 } else { 174 if (LOG.isDebugEnabled()) { 175 LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); 176 } 177 callback.done(true); 178 return true; 179 } 180 } 181 182 public boolean isSingleton() { 183 return producer.isSingleton(); 184 } 185 186 public void start() throws Exception { 187 ServiceHelper.startService(detour); 188 // here we also need to start the producer 189 ServiceHelper.startService(producer); 190 } 191 192 public void stop() throws Exception { 193 // do not stop detour as it should only be stopped when the interceptor stops 194 // we should stop the producer here 195 ServiceHelper.stopService(producer); 196 } 197 }; 198 } 199 200 public Consumer createConsumer(Processor processor) throws Exception { 201 return delegate.createConsumer(processor); 202 } 203 204 public PollingConsumer createPollingConsumer() throws Exception { 205 return delegate.createPollingConsumer(); 206 } 207 208 public void configureProperties(Map<String, Object> options) { 209 delegate.configureProperties(options); 210 } 211 212 public void setCamelContext(CamelContext context) { 213 delegate.setCamelContext(context); 214 } 215 216 public boolean isLenientProperties() { 217 return delegate.isLenientProperties(); 218 } 219 220 public boolean isSingleton() { 221 return delegate.isSingleton(); 222 } 223 224 public void start() throws Exception { 225 ServiceHelper.startServices(detour, delegate); 226 } 227 228 public void stop() throws Exception { 229 ServiceHelper.stopServices(delegate, detour); 230 } 231 232 @Override 233 public String toString() { 234 return delegate.toString(); 235 } 236 }