001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.AsyncProcessor;
021    import org.apache.camel.CamelExchangeException;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.ExchangePattern;
024    import org.apache.camel.Producer;
025    import org.apache.camel.impl.DefaultExchange;
026    import org.apache.camel.processor.aggregate.AggregationStrategy;
027    import org.apache.camel.support.ServiceSupport;
028    import org.apache.camel.util.AsyncProcessorConverterHelper;
029    import org.apache.camel.util.AsyncProcessorHelper;
030    import org.apache.camel.util.ExchangeHelper;
031    import org.apache.camel.util.ServiceHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
036    
037    /**
038     * A content enricher that enriches input data by first obtaining additional
039     * data from a <i>resource</i> represented by an endpoint <code>producer</code>
040     * and second by aggregating input data and additional data. Aggregation of
041     * input data and additional data is delegated to an {@link AggregationStrategy}
042     * object.
043     * <p/>
044     * Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher}
045     * that uses a {@link org.apache.camel.PollingConsumer}.
046     *
047     * @see PollEnricher
048     */
049    public class Enricher extends ServiceSupport implements AsyncProcessor {
050    
051        private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
052        private AggregationStrategy aggregationStrategy;
053        private Producer producer;
054    
055        /**
056         * Creates a new {@link Enricher}. The default aggregation strategy is to
057         * copy the additional data obtained from the enricher's resource over the
058         * input data. When using the copy aggregation strategy the enricher
059         * degenerates to a normal transformer.
060         * 
061         * @param producer producer to resource endpoint.
062         */
063        public Enricher(Producer producer) {
064            this(defaultAggregationStrategy(), producer);
065        }
066    
067        /**
068         * Creates a new {@link Enricher}.
069         * 
070         * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
071         * @param producer producer to resource endpoint.
072         */
073        public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
074            this.aggregationStrategy = aggregationStrategy;
075            this.producer = producer;
076        }
077    
078        /**
079         * Sets the aggregation strategy for this enricher.
080         *
081         * @param aggregationStrategy the aggregationStrategy to set
082         */
083        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
084            this.aggregationStrategy = aggregationStrategy;
085        }
086    
087        /**
088         * Sets the default aggregation strategy for this enricher.
089         */
090        public void setDefaultAggregationStrategy() {
091            this.aggregationStrategy = defaultAggregationStrategy();
092        }
093    
094        public void process(Exchange exchange) throws Exception {
095            AsyncProcessorHelper.process(this, exchange);
096        }
097    
098        /**
099         * Enriches the input data (<code>exchange</code>) by first obtaining
100         * additional data from an endpoint represented by an endpoint
101         * <code>producer</code> and second by aggregating input data and additional
102         * data. Aggregation of input data and additional data is delegated to an
103         * {@link AggregationStrategy} object set at construction time. If the
104         * message exchange with the resource endpoint fails then no aggregation
105         * will be done and the failed exchange content is copied over to the
106         * original message exchange.
107         *
108         * @param exchange input data.
109         */
110        public boolean process(final Exchange exchange, final AsyncCallback callback) {
111            final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
112    
113            AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
114            boolean sync = ap.process(resourceExchange, new AsyncCallback() {
115                public void done(boolean doneSync) {
116                    // we only have to handle async completion of the routing slip
117                    if (doneSync) {
118                        return;
119                    }
120    
121                    if (resourceExchange.isFailed()) {
122                        // copy resource exchange onto original exchange (preserving pattern)
123                        copyResultsPreservePattern(exchange, resourceExchange);
124                    } else {
125                        prepareResult(exchange);
126                        try {
127                            // prepare the exchanges for aggregation
128                            ExchangeHelper.prepareAggregation(exchange, resourceExchange);
129    
130                            Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
131                            if (aggregatedExchange != null) {
132                                // copy aggregation result onto original exchange (preserving pattern)
133                                copyResultsPreservePattern(exchange, aggregatedExchange);
134                            }
135                        } catch (Throwable e) {
136                            // if the aggregationStrategy threw an exception, set it on the original exchange
137                            exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
138                            callback.done(false);
139                            // we failed so break out now
140                            return;
141                        }
142                    }
143    
144                    // set property with the uri of the endpoint enriched so we can use that for tracing etc
145                    exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
146    
147                    callback.done(false);
148                }
149            });
150    
151            if (!sync) {
152                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
153                // the remainder of the routing slip will be completed async
154                // so we break out now, then the callback will be invoked which then continue routing from where we left here
155                return false;
156            }
157    
158            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
159    
160            if (resourceExchange.isFailed()) {
161                // copy resource exchange onto original exchange (preserving pattern)
162                copyResultsPreservePattern(exchange, resourceExchange);
163            } else {
164                prepareResult(exchange);
165    
166                try {
167                    // prepare the exchanges for aggregation
168                    ExchangeHelper.prepareAggregation(exchange, resourceExchange);
169    
170                    Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
171                    if (aggregatedExchange != null) {
172                        // copy aggregation result onto original exchange (preserving pattern)
173                        copyResultsPreservePattern(exchange, aggregatedExchange);
174                    }
175                } catch (Throwable e) {
176                    // if the aggregationStrategy threw an exception, set it on the original exchange
177                    exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
178                    callback.done(true);
179                    // we failed so break out now
180                    return true;
181                }
182            }
183    
184            // set property with the uri of the endpoint enriched so we can use that for tracing etc
185            exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
186    
187            callback.done(true);
188            return true;
189        }
190    
191        /**
192         * Creates a new {@link DefaultExchange} instance from the given
193         * <code>exchange</code>. The resulting exchange's pattern is defined by
194         * <code>pattern</code>.
195         *
196         * @param source  exchange to copy from.
197         * @param pattern exchange pattern to set.
198         * @return created exchange.
199         */
200        protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
201            // copy exchange, and do not share the unit of work
202            Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
203            target.setPattern(pattern);
204            return target;
205        }
206    
207        private static void prepareResult(Exchange exchange) {
208            if (exchange.getPattern().isOutCapable()) {
209                exchange.getOut().copyFrom(exchange.getIn());
210            }
211        }
212    
213        private static AggregationStrategy defaultAggregationStrategy() {
214            return new CopyAggregationStrategy();
215        }
216    
217        @Override
218        public String toString() {
219            return "Enrich[" + producer.getEndpoint() + "]";
220        }
221    
222        protected void doStart() throws Exception {
223            ServiceHelper.startServices(aggregationStrategy, producer);
224        }
225    
226        protected void doStop() throws Exception {
227            ServiceHelper.stopServices(producer, aggregationStrategy);
228        }
229    
230        private static class CopyAggregationStrategy implements AggregationStrategy {
231    
232            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
233                if (newExchange != null) {
234                    copyResultsPreservePattern(oldExchange, newExchange);
235                }
236                return oldExchange;
237            }
238    
239        }
240    
241    }