001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.AsyncProcessor;
021    import org.apache.camel.CamelExchangeException;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.PollingConsumer;
024    import org.apache.camel.processor.aggregate.AggregationStrategy;
025    import org.apache.camel.support.ServiceSupport;
026    import org.apache.camel.util.AsyncProcessorHelper;
027    import org.apache.camel.util.ExchangeHelper;
028    import org.apache.camel.util.ServiceHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
033    
034    /**
035     * A content enricher that enriches input data by first obtaining additional
036     * data from a <i>resource</i> represented by an endpoint <code>producer</code>
037     * and second by aggregating input data and additional data. Aggregation of
038     * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
039     * object.
040     * <p/>
041     * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher}
042     * that uses a {@link org.apache.camel.Producer}.
043     *
044     * @see Enricher
045     */
046    public class PollEnricher extends ServiceSupport implements AsyncProcessor {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
049        private AggregationStrategy aggregationStrategy;
050        private PollingConsumer consumer;
051        private long timeout;
052    
053        /**
054         * Creates a new {@link PollEnricher}. The default aggregation strategy is to
055         * copy the additional data obtained from the enricher's resource over the
056         * input data. When using the copy aggregation strategy the enricher
057         * degenerates to a normal transformer.
058         *
059         * @param consumer consumer to resource endpoint.
060         */
061        public PollEnricher(PollingConsumer consumer) {
062            this(defaultAggregationStrategy(), consumer, 0);
063        }
064    
065        /**
066         * Creates a new {@link PollEnricher}.
067         *
068         * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
069         * @param consumer consumer to resource endpoint.
070         * @param timeout timeout in millis
071         */
072        public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
073            this.aggregationStrategy = aggregationStrategy;
074            this.consumer = consumer;
075            this.timeout = timeout;
076        }
077    
078        /**
079         * Sets the aggregation strategy for this poll enricher.
080         *
081         * @param aggregationStrategy the aggregationStrategy to set
082         */
083        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
084            this.aggregationStrategy = aggregationStrategy;
085        }
086    
087        /**
088         * Sets the default aggregation strategy for this poll enricher.
089         */
090        public void setDefaultAggregationStrategy() {
091            this.aggregationStrategy = defaultAggregationStrategy();
092        }
093    
094        /**
095         * Sets the timeout to use when polling.
096         * <p/>
097         * Use 0 to use receiveNoWait,
098         * Use -1 to use receive with no timeout (which will block until data is available).
099         *
100         * @param timeout timeout in millis.
101         */
102        public void setTimeout(long timeout) {
103            this.timeout = timeout;
104        }
105    
106        public void process(Exchange exchange) throws Exception {
107            AsyncProcessorHelper.process(this, exchange);
108        }
109    
110        /**
111         * Enriches the input data (<code>exchange</code>) by first obtaining
112         * additional data from an endpoint represented by an endpoint
113         * <code>producer</code> and second by aggregating input data and additional
114         * data. Aggregation of input data and additional data is delegated to an
115         * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
116         * message exchange with the resource endpoint fails then no aggregation
117         * will be done and the failed exchange content is copied over to the
118         * original message exchange.
119         *
120         * @param exchange input data.
121         */
122        @Override
123        public boolean process(Exchange exchange, AsyncCallback callback) {
124            try {
125                preCheckPoll(exchange);
126            } catch (Exception e) {
127                exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e));
128                callback.done(true);
129                return true;
130            }
131    
132            Exchange resourceExchange;
133            if (timeout < 0) {
134                LOG.debug("Consumer receive: {}", consumer);
135                resourceExchange = consumer.receive();
136            } else if (timeout == 0) {
137                LOG.debug("Consumer receiveNoWait: {}", consumer);
138                resourceExchange = consumer.receiveNoWait();
139            } else {
140                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
141                resourceExchange = consumer.receive(timeout);
142            }
143    
144            if (resourceExchange == null) {
145                LOG.debug("Consumer received no exchange");
146            } else {
147                LOG.debug("Consumer received: {}", resourceExchange);
148            }
149    
150            if (resourceExchange != null && resourceExchange.isFailed()) {
151                // copy resource exchange onto original exchange (preserving pattern)
152                copyResultsPreservePattern(exchange, resourceExchange);
153            } else {
154                prepareResult(exchange);
155    
156                try {
157                    // prepare the exchanges for aggregation
158                    ExchangeHelper.prepareAggregation(exchange, resourceExchange);
159                    // must catch any exception from aggregation
160                    Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
161                    if (aggregatedExchange != null) {
162                        // copy aggregation result onto original exchange (preserving pattern)
163                        copyResultsPreservePattern(exchange, aggregatedExchange);
164                        // handover any synchronization
165                        if (resourceExchange != null) {
166                            resourceExchange.handoverCompletions(exchange);
167                        }
168                    }
169                } catch (Throwable e) {
170                    exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
171                    callback.done(true);
172                    return true;
173                }
174            }
175    
176            // set header with the uri of the endpoint enriched so we can use that for tracing etc
177            if (exchange.hasOut()) {
178                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
179            } else {
180                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
181            }
182    
183            callback.done(true);
184            return true;
185        }
186    
187        /**
188         * Strategy to pre check polling.
189         * <p/>
190         * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
191         * started from a file based endpoint as that is not currently supported.
192         *
193         * @param exchange the current exchange
194         */
195        protected void preCheckPoll(Exchange exchange) throws Exception {
196            // noop
197        }
198    
199        private static void prepareResult(Exchange exchange) {
200            if (exchange.getPattern().isOutCapable()) {
201                exchange.getOut().copyFrom(exchange.getIn());
202            }
203        }
204    
205        private static AggregationStrategy defaultAggregationStrategy() {
206            return new CopyAggregationStrategy();
207        }
208    
209        @Override
210        public String toString() {
211            return "PollEnrich[" + consumer + "]";
212        }
213    
214        protected void doStart() throws Exception {
215            ServiceHelper.startServices(aggregationStrategy, consumer);
216        }
217    
218        protected void doStop() throws Exception {
219            ServiceHelper.stopServices(consumer, aggregationStrategy);
220        }
221    
222        private static class CopyAggregationStrategy implements AggregationStrategy {
223    
224            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
225                if (newExchange != null) {
226                    copyResultsPreservePattern(oldExchange, newExchange);
227                } else {
228                    // if no newExchange then there was no message from the external resource
229                    // and therefore we should set an empty body to indicate this fact
230                    // but keep headers/attachments as we want to propagate those
231                    oldExchange.getIn().setBody(null);
232                    oldExchange.setOut(null);
233                }
234                return oldExchange;
235            }
236    
237        }
238    
239    }