001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Iterator; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.Expression; 029 import org.apache.camel.FailedToCreateProducerException; 030 import org.apache.camel.Message; 031 import org.apache.camel.Producer; 032 import org.apache.camel.Traceable; 033 import org.apache.camel.builder.ExpressionBuilder; 034 import org.apache.camel.impl.DefaultExchange; 035 import org.apache.camel.impl.ProducerCache; 036 import org.apache.camel.support.ServiceSupport; 037 import org.apache.camel.util.AsyncProcessorHelper; 038 import org.apache.camel.util.ExchangeHelper; 039 import org.apache.camel.util.MessageHelper; 040 import org.apache.camel.util.ObjectHelper; 041 import org.apache.camel.util.ServiceHelper; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 import static org.apache.camel.processor.PipelineHelper.continueProcessing; 046 import static org.apache.camel.util.ObjectHelper.notNull; 047 048 /** 049 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 050 * pattern where the list of actual endpoints to send a message exchange to are 051 * dependent on the value of a message header. 052 * <p/> 053 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation 054 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the 055 * pipeline to ensure it works the same and the async routing engine is flawless. 056 */ 057 public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable { 058 protected final Logger log = LoggerFactory.getLogger(getClass()); 059 protected ProducerCache producerCache; 060 protected boolean ignoreInvalidEndpoints; 061 protected String header; 062 protected Expression expression; 063 protected String uriDelimiter; 064 protected final CamelContext camelContext; 065 066 /** 067 * The iterator to be used for retrieving the next routing slip(s) to be used. 068 */ 069 protected interface RoutingSlipIterator { 070 071 /** 072 * Are the more routing slip(s)? 073 * 074 * @param exchange the current exchange 075 * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise. 076 */ 077 boolean hasNext(Exchange exchange); 078 079 /** 080 * Returns the next routing slip(s). 081 * 082 * @param exchange the current exchange 083 * @return the slip(s). 084 */ 085 Object next(Exchange exchange); 086 087 } 088 089 public RoutingSlip(CamelContext camelContext) { 090 notNull(camelContext, "camelContext"); 091 this.camelContext = camelContext; 092 } 093 094 public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) { 095 notNull(camelContext, "camelContext"); 096 notNull(expression, "expression"); 097 098 this.camelContext = camelContext; 099 this.expression = expression; 100 this.uriDelimiter = uriDelimiter; 101 this.header = null; 102 } 103 104 public void setDelimiter(String delimiter) { 105 this.uriDelimiter = delimiter; 106 } 107 108 public boolean isIgnoreInvalidEndpoints() { 109 return ignoreInvalidEndpoints; 110 } 111 112 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 113 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 114 } 115 116 @Override 117 public String toString() { 118 return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]"; 119 } 120 121 public String getTraceLabel() { 122 return "routingSlip[" + expression + "]"; 123 } 124 125 public void process(Exchange exchange) throws Exception { 126 AsyncProcessorHelper.process(this, exchange); 127 } 128 129 public boolean process(Exchange exchange, AsyncCallback callback) { 130 if (!isStarted()) { 131 exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this)); 132 callback.done(true); 133 return true; 134 } 135 136 return doRoutingSlip(exchange, callback); 137 } 138 139 public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) { 140 if (routingSlip instanceof Expression) { 141 this.expression = (Expression) routingSlip; 142 } else { 143 this.expression = ExpressionBuilder.constantExpression(routingSlip); 144 } 145 return doRoutingSlip(exchange, callback); 146 } 147 148 /** 149 * Creates the route slip iterator to be used. 150 * 151 * @param exchange the exchange 152 * @return the iterator, should never be <tt>null</tt> 153 */ 154 protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception { 155 Object slip = expression.evaluate(exchange, Object.class); 156 if (exchange.getException() != null) { 157 // force any exceptions occurred during evaluation to be thrown 158 throw exchange.getException(); 159 } 160 161 final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter); 162 163 return new RoutingSlipIterator() { 164 public boolean hasNext(Exchange exchange) { 165 return delegate.hasNext(); 166 } 167 168 public Object next(Exchange exchange) { 169 return delegate.next(); 170 } 171 }; 172 } 173 174 private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) { 175 Exchange current = exchange; 176 RoutingSlipIterator iter; 177 try { 178 iter = createRoutingSlipIterator(exchange); 179 } catch (Exception e) { 180 exchange.setException(e); 181 callback.done(true); 182 return true; 183 } 184 185 // ensure the slip is empty when we start 186 if (current.hasProperties()) { 187 current.setProperty(Exchange.SLIP_ENDPOINT, null); 188 } 189 190 while (iter.hasNext(current)) { 191 Endpoint endpoint; 192 try { 193 endpoint = resolveEndpoint(iter, exchange); 194 // if no endpoint was resolved then try the next 195 if (endpoint == null) { 196 continue; 197 } 198 } catch (Exception e) { 199 // error resolving endpoint so we should break out 200 current.setException(e); 201 break; 202 } 203 204 // prepare and process the routing slip 205 Exchange copy = prepareExchangeForRoutingSlip(current, endpoint); 206 boolean sync = processExchange(endpoint, copy, exchange, callback, iter); 207 current = copy; 208 209 if (!sync) { 210 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 211 // the remainder of the routing slip will be completed async 212 // so we break out now, then the callback will be invoked which then continue routing from where we left here 213 return false; 214 } 215 216 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 217 218 // we ignore some kind of exceptions and allow us to continue 219 if (isIgnoreInvalidEndpoints()) { 220 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 221 if (e != null) { 222 if (log.isDebugEnabled()) { 223 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 224 } 225 current.setException(null); 226 } 227 } 228 229 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 230 // check for error if so we should break out 231 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 232 break; 233 } 234 } 235 236 // logging nextExchange as it contains the exchange that might have altered the payload and since 237 // we are logging the completion if will be confusing if we log the original instead 238 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 239 log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); 240 241 // copy results back to the original exchange 242 ExchangeHelper.copyResults(exchange, current); 243 244 callback.done(true); 245 return true; 246 } 247 248 protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception { 249 Object nextRecipient = iter.next(exchange); 250 Endpoint endpoint = null; 251 try { 252 endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient); 253 } catch (Exception e) { 254 if (isIgnoreInvalidEndpoints()) { 255 log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e); 256 } else { 257 throw e; 258 } 259 } 260 return endpoint; 261 } 262 263 protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) { 264 Exchange copy = new DefaultExchange(current); 265 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot 266 // before processing the next step in the pipeline, so we have a snapshot of the exchange 267 // just before. This snapshot is used if Camel should do redeliveries (re try) using 268 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* 269 // exchange being routed. 270 copy.setExchangeId(current.getExchangeId()); 271 copyOutToIn(copy, current); 272 273 // ensure stream caching is reset 274 MessageHelper.resetStreamCache(copy.getIn()); 275 276 return copy; 277 } 278 279 protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original, 280 final AsyncCallback callback, final RoutingSlipIterator iter) { 281 282 // this does the actual processing so log at trace level 283 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 284 285 boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() { 286 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 287 ExchangePattern exchangePattern, final AsyncCallback callback) { 288 // set property which endpoint we send to 289 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 290 exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); 291 292 return asyncProducer.process(exchange, new AsyncCallback() { 293 public void done(boolean doneSync) { 294 // we only have to handle async completion of the routing slip 295 if (doneSync) { 296 callback.done(doneSync); 297 return; 298 } 299 300 // continue processing the routing slip asynchronously 301 Exchange current = exchange; 302 303 while (iter.hasNext(current)) { 304 305 // we ignore some kind of exceptions and allow us to continue 306 if (isIgnoreInvalidEndpoints()) { 307 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 308 if (e != null) { 309 if (log.isDebugEnabled()) { 310 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 311 } 312 current.setException(null); 313 } 314 } 315 316 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 317 // check for error if so we should break out 318 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 319 break; 320 } 321 322 Endpoint endpoint; 323 try { 324 endpoint = resolveEndpoint(iter, exchange); 325 // if no endpoint was resolved then try the next 326 if (endpoint == null) { 327 continue; 328 } 329 } catch (Exception e) { 330 // error resolving endpoint so we should break out 331 exchange.setException(e); 332 break; 333 } 334 335 // prepare and process the routing slip 336 Exchange copy = prepareExchangeForRoutingSlip(current, endpoint); 337 boolean sync = processExchange(endpoint, copy, original, callback, iter); 338 current = copy; 339 340 if (!sync) { 341 log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 342 return; 343 } 344 } 345 346 // logging nextExchange as it contains the exchange that might have altered the payload and since 347 // we are logging the completion if will be confusing if we log the original instead 348 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 349 log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); 350 351 // copy results back to the original exchange 352 ExchangeHelper.copyResults(original, current); 353 callback.done(false); 354 } 355 }); 356 } 357 }); 358 359 return sync; 360 } 361 362 protected void doStart() throws Exception { 363 if (producerCache == null) { 364 producerCache = new ProducerCache(this, camelContext); 365 } 366 ServiceHelper.startService(producerCache); 367 } 368 369 protected void doStop() throws Exception { 370 ServiceHelper.stopService(producerCache); 371 } 372 373 protected void doShutdown() throws Exception { 374 ServiceHelper.stopAndShutdownService(producerCache); 375 } 376 377 /** 378 * Returns the outbound message if available. Otherwise return the inbound message. 379 */ 380 private Message getResultMessage(Exchange exchange) { 381 if (exchange.hasOut()) { 382 return exchange.getOut(); 383 } else { 384 // if this endpoint had no out (like a mock endpoint) just take the in 385 return exchange.getIn(); 386 } 387 } 388 389 /** 390 * Copy the outbound data in 'source' to the inbound data in 'result'. 391 */ 392 private void copyOutToIn(Exchange result, Exchange source) { 393 result.setException(source.getException()); 394 395 if (source.hasOut() && source.getOut().isFault()) { 396 result.getOut().copyFrom(source.getOut()); 397 } 398 399 result.setIn(getResultMessage(source)); 400 401 result.getProperties().clear(); 402 result.getProperties().putAll(source.getProperties()); 403 } 404 }