001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.AsyncProcessor; 021 import org.apache.camel.CamelExchangeException; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.PollingConsumer; 024 import org.apache.camel.processor.aggregate.AggregationStrategy; 025 import org.apache.camel.support.ServiceSupport; 026 import org.apache.camel.util.AsyncProcessorHelper; 027 import org.apache.camel.util.ExchangeHelper; 028 import org.apache.camel.util.ServiceHelper; 029 import org.slf4j.Logger; 030 import org.slf4j.LoggerFactory; 031 032 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; 033 034 /** 035 * A content enricher that enriches input data by first obtaining additional 036 * data from a <i>resource</i> represented by an endpoint <code>producer</code> 037 * and second by aggregating input data and additional data. Aggregation of 038 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy} 039 * object. 040 * <p/> 041 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher} 042 * that uses a {@link org.apache.camel.Producer}. 043 * 044 * @see Enricher 045 */ 046 public class PollEnricher extends ServiceSupport implements AsyncProcessor { 047 048 private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); 049 private AggregationStrategy aggregationStrategy; 050 private PollingConsumer consumer; 051 private long timeout; 052 053 /** 054 * Creates a new {@link PollEnricher}. The default aggregation strategy is to 055 * copy the additional data obtained from the enricher's resource over the 056 * input data. When using the copy aggregation strategy the enricher 057 * degenerates to a normal transformer. 058 * 059 * @param consumer consumer to resource endpoint. 060 */ 061 public PollEnricher(PollingConsumer consumer) { 062 this(defaultAggregationStrategy(), consumer, 0); 063 } 064 065 /** 066 * Creates a new {@link PollEnricher}. 067 * 068 * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. 069 * @param consumer consumer to resource endpoint. 070 * @param timeout timeout in millis 071 */ 072 public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) { 073 this.aggregationStrategy = aggregationStrategy; 074 this.consumer = consumer; 075 this.timeout = timeout; 076 } 077 078 /** 079 * Sets the aggregation strategy for this poll enricher. 080 * 081 * @param aggregationStrategy the aggregationStrategy to set 082 */ 083 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 084 this.aggregationStrategy = aggregationStrategy; 085 } 086 087 /** 088 * Sets the default aggregation strategy for this poll enricher. 089 */ 090 public void setDefaultAggregationStrategy() { 091 this.aggregationStrategy = defaultAggregationStrategy(); 092 } 093 094 /** 095 * Sets the timeout to use when polling. 096 * <p/> 097 * Use 0 to use receiveNoWait, 098 * Use -1 to use receive with no timeout (which will block until data is available). 099 * 100 * @param timeout timeout in millis. 101 */ 102 public void setTimeout(long timeout) { 103 this.timeout = timeout; 104 } 105 106 public void process(Exchange exchange) throws Exception { 107 AsyncProcessorHelper.process(this, exchange); 108 } 109 110 /** 111 * Enriches the input data (<code>exchange</code>) by first obtaining 112 * additional data from an endpoint represented by an endpoint 113 * <code>producer</code> and second by aggregating input data and additional 114 * data. Aggregation of input data and additional data is delegated to an 115 * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the 116 * message exchange with the resource endpoint fails then no aggregation 117 * will be done and the failed exchange content is copied over to the 118 * original message exchange. 119 * 120 * @param exchange input data. 121 */ 122 @Override 123 public boolean process(Exchange exchange, AsyncCallback callback) { 124 try { 125 preCheckPoll(exchange); 126 } catch (Exception e) { 127 exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e)); 128 callback.done(true); 129 return true; 130 } 131 132 Exchange resourceExchange; 133 if (timeout < 0) { 134 LOG.debug("Consumer receive: {}", consumer); 135 resourceExchange = consumer.receive(); 136 } else if (timeout == 0) { 137 LOG.debug("Consumer receiveNoWait: {}", consumer); 138 resourceExchange = consumer.receiveNoWait(); 139 } else { 140 LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); 141 resourceExchange = consumer.receive(timeout); 142 } 143 144 if (resourceExchange == null) { 145 LOG.debug("Consumer received no exchange"); 146 } else { 147 LOG.debug("Consumer received: {}", resourceExchange); 148 } 149 150 if (resourceExchange != null && resourceExchange.isFailed()) { 151 // copy resource exchange onto original exchange (preserving pattern) 152 copyResultsPreservePattern(exchange, resourceExchange); 153 } else { 154 prepareResult(exchange); 155 156 try { 157 // prepare the exchanges for aggregation 158 ExchangeHelper.prepareAggregation(exchange, resourceExchange); 159 // must catch any exception from aggregation 160 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); 161 if (aggregatedExchange != null) { 162 // copy aggregation result onto original exchange (preserving pattern) 163 copyResultsPreservePattern(exchange, aggregatedExchange); 164 // handover any synchronization 165 if (resourceExchange != null) { 166 resourceExchange.handoverCompletions(exchange); 167 } 168 } 169 } catch (Throwable e) { 170 exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); 171 callback.done(true); 172 return true; 173 } 174 } 175 176 // set header with the uri of the endpoint enriched so we can use that for tracing etc 177 if (exchange.hasOut()) { 178 exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 179 } else { 180 exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); 181 } 182 183 callback.done(true); 184 return true; 185 } 186 187 /** 188 * Strategy to pre check polling. 189 * <p/> 190 * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also 191 * started from a file based endpoint as that is not currently supported. 192 * 193 * @param exchange the current exchange 194 */ 195 protected void preCheckPoll(Exchange exchange) throws Exception { 196 // noop 197 } 198 199 private static void prepareResult(Exchange exchange) { 200 if (exchange.getPattern().isOutCapable()) { 201 exchange.getOut().copyFrom(exchange.getIn()); 202 } 203 } 204 205 private static AggregationStrategy defaultAggregationStrategy() { 206 return new CopyAggregationStrategy(); 207 } 208 209 @Override 210 public String toString() { 211 return "PollEnrich[" + consumer + "]"; 212 } 213 214 protected void doStart() throws Exception { 215 ServiceHelper.startServices(aggregationStrategy, consumer); 216 } 217 218 protected void doStop() throws Exception { 219 ServiceHelper.stopServices(consumer, aggregationStrategy); 220 } 221 222 private static class CopyAggregationStrategy implements AggregationStrategy { 223 224 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 225 if (newExchange != null) { 226 copyResultsPreservePattern(oldExchange, newExchange); 227 } else { 228 // if no newExchange then there was no message from the external resource 229 // and therefore we should set an empty body to indicate this fact 230 // but keep headers/attachments as we want to propagate those 231 oldExchange.getIn().setBody(null); 232 oldExchange.setOut(null); 233 } 234 return oldExchange; 235 } 236 237 } 238 239 }