001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.AsyncProcessor; 021 import org.apache.camel.CamelExchangeException; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.ExchangePattern; 024 import org.apache.camel.Producer; 025 import org.apache.camel.impl.DefaultExchange; 026 import org.apache.camel.processor.aggregate.AggregationStrategy; 027 import org.apache.camel.support.ServiceSupport; 028 import org.apache.camel.util.AsyncProcessorConverterHelper; 029 import org.apache.camel.util.AsyncProcessorHelper; 030 import org.apache.camel.util.ExchangeHelper; 031 import org.apache.camel.util.ServiceHelper; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 036 037 /** 038 * A content enricher that enriches input data by first obtaining additional 039 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 040 * and second by aggregating input data and additional data. Aggregation of 041 * input data and additional data is delegated to an {@link AggregationStrategy} 042 * object. 043 * <p/> 044 * Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher} 045 * that uses a {@link org.apache.camel.PollingConsumer}. 046 * 047 * @see PollEnricher 048 */ 049 public class Enricher extends ServiceSupport implements AsyncProcessor { 050 051 private static final Logger LOG = LoggerFactory.getLogger(Enricher.class); 052 private AggregationStrategy aggregationStrategy; 053 private Producer producer; 054 055 /** 056 * Creates a new {@link Enricher}. The default aggregation strategy is to 057 * copy the additional data obtained from the enricher's resource over the 058 * input data. When using the copy aggregation strategy the enricher 059 * degenerates to a normal transformer. 060 * 061 * @param producer producer to resource endpoint. 062 */ 063 public Enricher(Producer producer) { 064 this(defaultAggregationStrategy(), producer); 065 } 066 067 /** 068 * Creates a new {@link Enricher}. 069 * 070 * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. 071 * @param producer producer to resource endpoint. 072 */ 073 public Enricher(AggregationStrategy aggregationStrategy, Producer producer) { 074 this.aggregationStrategy = aggregationStrategy; 075 this.producer = producer; 076 } 077 078 /** 079 * Sets the aggregation strategy for this enricher. 080 * 081 * @param aggregationStrategy the aggregationStrategy to set 082 */ 083 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 084 this.aggregationStrategy = aggregationStrategy; 085 } 086 087 /** 088 * Sets the default aggregation strategy for this enricher. 089 */ 090 public void setDefaultAggregationStrategy() { 091 this.aggregationStrategy = defaultAggregationStrategy(); 092 } 093 094 public void process(Exchange exchange) throws Exception { 095 AsyncProcessorHelper.process(this, exchange); 096 } 097 098 /** 099 * Enriches the input data (<code>exchange</code>) by first obtaining 100 * additional data from an endpoint represented by an endpoint 101 * <code>producer</code> and second by aggregating input data and additional 102 * data. Aggregation of input data and additional data is delegated to an 103 * {@link AggregationStrategy} object set at construction time. If the 104 * message exchange with the resource endpoint fails then no aggregation 105 * will be done and the failed exchange content is copied over to the 106 * original message exchange. 107 * 108 * @param exchange input data. 109 */ 110 public boolean process(final Exchange exchange, final AsyncCallback callback) { 111 final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); 112 113 AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer); 114 boolean sync = ap.process(resourceExchange, new AsyncCallback() { 115 public void done(boolean doneSync) { 116 // we only have to handle async completion of the routing slip 117 if (doneSync) { 118 return; 119 } 120 121 if (resourceExchange.isFailed()) { 122 // copy resource exchange onto original exchange (preserving pattern) 123 copyResultsPreservePattern(exchange, resourceExchange); 124 } else { 125 prepareResult(exchange); 126 try { 127 // prepare the exchanges for aggregation 128 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 129 130 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 131 if (aggregatedExchange != null) { 132 // copy aggregation result onto original exchange (preserving pattern) 133 copyResultsPreservePattern(exchange, aggregatedExchange); 134 } 135 } catch (Throwable e) { 136 // if the aggregationStrategy threw an exception, set it on the original exchange 137 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 138 callback.done(false); 139 // we failed so break out now 140 return; 141 } 142 } 143 144 // set property with the uri of the endpoint enriched so we can use that for tracing etc 145 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 146 147 callback.done(false); 148 } 149 }); 150 151 if (!sync) { 152 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 153 // the remainder of the routing slip will be completed async 154 // so we break out now, then the callback will be invoked which then continue routing from where we left here 155 return false; 156 } 157 158 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 159 160 if (resourceExchange.isFailed()) { 161 // copy resource exchange onto original exchange (preserving pattern) 162 copyResultsPreservePattern(exchange, resourceExchange); 163 } else { 164 prepareResult(exchange); 165 166 try { 167 // prepare the exchanges for aggregation 168 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 169 170 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 171 if (aggregatedExchange != null) { 172 // copy aggregation result onto original exchange (preserving pattern) 173 copyResultsPreservePattern(exchange, aggregatedExchange); 174 } 175 } catch (Throwable e) { 176 // if the aggregationStrategy threw an exception, set it on the original exchange 177 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 178 callback.done(true); 179 // we failed so break out now 180 return true; 181 } 182 } 183 184 // set property with the uri of the endpoint enriched so we can use that for tracing etc 185 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 186 187 callback.done(true); 188 return true; 189 } 190 191 /** 192 * Creates a new {@link DefaultExchange} instance from the given 193 * <code>exchange</code>. The resulting exchange's pattern is defined by 194 * <code>pattern</code>. 195 * 196 * @param source exchange to copy from. 197 * @param pattern exchange pattern to set. 198 * @return created exchange. 199 */ 200 protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) { 201 // copy exchange, and do not share the unit of work 202 Exchange target = ExchangeHelper.createCorrelatedCopy(source, false); 203 target.setPattern(pattern); 204 return target; 205 } 206 207 private static void prepareResult(Exchange exchange) { 208 if (exchange.getPattern().isOutCapable()) { 209 exchange.getOut().copyFrom(exchange.getIn()); 210 } 211 } 212 213 private static AggregationStrategy defaultAggregationStrategy() { 214 return new CopyAggregationStrategy(); 215 } 216 217 @Override 218 public String toString() { 219 return "Enrich[" + producer.getEndpoint() + "]"; 220 } 221 222 protected void doStart() throws Exception { 223 ServiceHelper.startServices(aggregationStrategy, producer); 224 } 225 226 protected void doStop() throws Exception { 227 ServiceHelper.stopServices(producer, aggregationStrategy); 228 } 229 230 private static class CopyAggregationStrategy implements AggregationStrategy { 231 232 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 233 if (newExchange != null) { 234 copyResultsPreservePattern(oldExchange, newExchange); 235 } 236 return oldExchange; 237 } 238 239 } 240 241 }