001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import javax.xml.bind.annotation.XmlAccessType; 020 import javax.xml.bind.annotation.XmlAccessorType; 021 import javax.xml.bind.annotation.XmlAttribute; 022 import javax.xml.bind.annotation.XmlRootElement; 023 import javax.xml.bind.annotation.XmlTransient; 024 025 import org.apache.camel.CamelContextAware; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Processor; 028 import org.apache.camel.processor.PollEnricher; 029 import org.apache.camel.processor.aggregate.AggregationStrategy; 030 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 031 import org.apache.camel.spi.RouteContext; 032 import org.apache.camel.util.ObjectHelper; 033 034 /** 035 * Represents an XML <pollEnrich/> element 036 * 037 * @see org.apache.camel.processor.Enricher 038 */ 039 @XmlRootElement(name = "pollEnrich") 040 @XmlAccessorType(XmlAccessType.FIELD) 041 public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition { 042 @XmlAttribute(name = "uri") 043 private String resourceUri; 044 // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref: 045 @XmlAttribute(name = "ref") 046 private String resourceRef; 047 @XmlAttribute 048 private Long timeout; 049 @XmlAttribute(name = "strategyRef") 050 private String aggregationStrategyRef; 051 @XmlAttribute(name = "strategyMethodName") 052 private String aggregationStrategyMethodName; 053 @XmlAttribute(name = "strategyMethodAllowNull") 054 private Boolean aggregationStrategyMethodAllowNull; 055 @XmlTransient 056 private AggregationStrategy aggregationStrategy; 057 058 public PollEnrichDefinition() { 059 } 060 061 public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) { 062 this.aggregationStrategy = aggregationStrategy; 063 this.resourceUri = resourceUri; 064 this.timeout = timeout; 065 } 066 067 @Override 068 public String toString() { 069 return "PollEnrich[" + description() + " " + aggregationStrategy + "]"; 070 } 071 072 protected String description() { 073 return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null); 074 } 075 076 @Override 077 public String getShortName() { 078 return "pollEnrich"; 079 } 080 081 @Override 082 public String getLabel() { 083 return "pollEnrich[" + description() + "]"; 084 } 085 086 @Override 087 public String getEndpointUri() { 088 if (resourceUri != null) { 089 return resourceUri; 090 } else { 091 return null; 092 } 093 } 094 095 @Override 096 public Processor createProcessor(RouteContext routeContext) throws Exception { 097 if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) { 098 throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint"); 099 } 100 101 // lookup endpoint 102 Endpoint endpoint; 103 if (resourceUri != null) { 104 endpoint = routeContext.resolveEndpoint(resourceUri); 105 } else { 106 endpoint = routeContext.resolveEndpoint(null, resourceRef); 107 } 108 109 PollEnricher enricher; 110 if (timeout != null) { 111 enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout); 112 } else { 113 // if no timeout then we should block, and there use a negative timeout 114 enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1); 115 } 116 117 AggregationStrategy strategy = createAggregationStrategy(routeContext); 118 if (strategy == null) { 119 enricher.setDefaultAggregationStrategy(); 120 } else { 121 enricher.setAggregationStrategy(strategy); 122 } 123 124 return enricher; 125 } 126 127 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 128 AggregationStrategy strategy = getAggregationStrategy(); 129 if (strategy == null && aggregationStrategyRef != null) { 130 Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class); 131 if (aggStrategy instanceof AggregationStrategy) { 132 strategy = (AggregationStrategy) aggStrategy; 133 } else if (aggStrategy != null) { 134 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 135 if (getAggregationStrategyMethodAllowNull() != null) { 136 adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull()); 137 adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull()); 138 } 139 strategy = adapter; 140 } else { 141 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef); 142 } 143 } 144 145 if (strategy != null && strategy instanceof CamelContextAware) { 146 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 147 } 148 149 return strategy; 150 } 151 152 public String getResourceUri() { 153 return resourceUri; 154 } 155 156 public void setResourceUri(String resourceUri) { 157 this.resourceUri = resourceUri; 158 } 159 160 public String getResourceRef() { 161 return resourceRef; 162 } 163 164 public void setResourceRef(String resourceRef) { 165 this.resourceRef = resourceRef; 166 } 167 168 public Long getTimeout() { 169 return timeout; 170 } 171 172 public void setTimeout(Long timeout) { 173 this.timeout = timeout; 174 } 175 176 public String getAggregationStrategyRef() { 177 return aggregationStrategyRef; 178 } 179 180 public void setAggregationStrategyRef(String aggregationStrategyRef) { 181 this.aggregationStrategyRef = aggregationStrategyRef; 182 } 183 184 public String getAggregationStrategyMethodName() { 185 return aggregationStrategyMethodName; 186 } 187 188 public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) { 189 this.aggregationStrategyMethodName = aggregationStrategyMethodName; 190 } 191 192 public Boolean getAggregationStrategyMethodAllowNull() { 193 return aggregationStrategyMethodAllowNull; 194 } 195 196 public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { 197 this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; 198 } 199 200 public AggregationStrategy getAggregationStrategy() { 201 return aggregationStrategy; 202 } 203 204 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 205 this.aggregationStrategy = aggregationStrategy; 206 } 207 }