001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.concurrent.ExecutorService; 023 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Processor; 028 import org.apache.camel.Producer; 029 import org.apache.camel.impl.ProducerCache; 030 import org.apache.camel.processor.aggregate.AggregationStrategy; 031 import org.apache.camel.spi.RouteContext; 032 import org.apache.camel.util.ExchangeHelper; 033 import org.apache.camel.util.MessageHelper; 034 import org.apache.camel.util.ServiceHelper; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * Implements a dynamic <a 040 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 041 * pattern where the list of actual endpoints to send a message exchange to are 042 * dependent on some dynamic expression. 043 * <p/> 044 * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based 045 * on recipient lists. This implementation have to handle the fact the processors is not known at design time 046 * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime 047 * lookup endpoints and create producers which should act as the processors for the multicast processors which 048 * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code 049 * more trickier. 050 * 051 * @version 052 */ 053 public class RecipientListProcessor extends MulticastProcessor { 054 055 private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class); 056 private final Iterator<Object> iter; 057 private boolean ignoreInvalidEndpoints; 058 private ProducerCache producerCache; 059 060 /** 061 * Class that represent each step in the recipient list to do 062 * <p/> 063 * This implementation ensures the provided producer is being released back in the producer cache when 064 * its done using it. 065 */ 066 static final class RecipientProcessorExchangePair implements ProcessorExchangePair { 067 private final int index; 068 private final Endpoint endpoint; 069 private final Producer producer; 070 private Processor prepared; 071 private final Exchange exchange; 072 private final ProducerCache producerCache; 073 074 private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer, 075 Processor prepared, Exchange exchange) { 076 this.index = index; 077 this.producerCache = producerCache; 078 this.endpoint = endpoint; 079 this.producer = producer; 080 this.prepared = prepared; 081 this.exchange = exchange; 082 } 083 084 public int getIndex() { 085 return index; 086 } 087 088 public Exchange getExchange() { 089 return exchange; 090 } 091 092 public Producer getProducer() { 093 return producer; 094 } 095 096 public Processor getProcessor() { 097 return prepared; 098 } 099 100 public void begin() { 101 // we have already acquired and prepare the producer 102 LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange); 103 exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri()); 104 // ensure stream caching is reset 105 MessageHelper.resetStreamCache(exchange.getIn()); 106 } 107 108 public void done() { 109 LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange); 110 // when we are done we should release back in pool 111 try { 112 producerCache.releaseProducer(endpoint, producer); 113 } catch (Exception e) { 114 if (LOG.isDebugEnabled()) { 115 LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e); 116 } 117 } 118 } 119 120 } 121 122 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) { 123 super(camelContext, null); 124 this.producerCache = producerCache; 125 this.iter = iter; 126 } 127 128 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) { 129 super(camelContext, null, aggregationStrategy); 130 this.producerCache = producerCache; 131 this.iter = iter; 132 } 133 134 public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, 135 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 136 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 137 super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 138 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork); 139 this.producerCache = producerCache; 140 this.iter = iter; 141 } 142 143 public boolean isIgnoreInvalidEndpoints() { 144 return ignoreInvalidEndpoints; 145 } 146 147 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 148 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 149 } 150 151 @Override 152 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 153 // here we iterate the recipient lists and create the exchange pair for each of those 154 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 155 156 // at first we must lookup the endpoint and acquire the producer which can send to the endpoint 157 int index = 0; 158 while (iter.hasNext()) { 159 Object recipient = iter.next(); 160 Endpoint endpoint; 161 Producer producer; 162 try { 163 endpoint = resolveEndpoint(exchange, recipient); 164 producer = producerCache.acquireProducer(endpoint); 165 } catch (Exception e) { 166 if (isIgnoreInvalidEndpoints()) { 167 if (LOG.isDebugEnabled()) { 168 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 169 } 170 continue; 171 } else { 172 // failure so break out 173 throw e; 174 } 175 } 176 177 // then create the exchange pair 178 result.add(createProcessorExchangePair(index++, endpoint, producer, exchange)); 179 } 180 181 return result; 182 } 183 184 /** 185 * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead 186 */ 187 protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) { 188 Processor prepared = producer; 189 190 // copy exchange, and do not share the unit of work 191 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 192 193 // if we share unit of work, we need to prepare the child exchange 194 if (isShareUnitOfWork()) { 195 prepareSharedUnitOfWork(copy, exchange); 196 } 197 198 // set property which endpoint we send to 199 setToEndpoint(copy, prepared); 200 201 // rework error handling to support fine grained error handling 202 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 203 prepared = createErrorHandler(routeContext, copy, prepared); 204 205 // invoke on prepare on the exchange if specified 206 if (onPrepare != null) { 207 try { 208 onPrepare.process(copy); 209 } catch (Exception e) { 210 copy.setException(e); 211 } 212 } 213 214 // and create the pair 215 return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy); 216 } 217 218 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 219 // trim strings as end users might have added spaces between separators 220 if (recipient instanceof String) { 221 recipient = ((String) recipient).trim(); 222 } 223 return ExchangeHelper.resolveEndpoint(exchange, recipient); 224 } 225 226 protected void doStart() throws Exception { 227 super.doStart(); 228 if (producerCache == null) { 229 producerCache = new ProducerCache(this, getCamelContext()); 230 } 231 ServiceHelper.startService(producerCache); 232 } 233 234 protected void doStop() throws Exception { 235 ServiceHelper.stopService(producerCache); 236 super.doStop(); 237 } 238 239 protected void doShutdown() throws Exception { 240 ServiceHelper.stopAndShutdownService(producerCache); 241 super.doShutdown(); 242 } 243 244 @Override 245 public String toString() { 246 return "RecipientList"; 247 } 248 249 @Override 250 public String getTraceLabel() { 251 return "recipientList"; 252 } 253 }