001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import javax.xml.bind.annotation.XmlAccessType;
020    import javax.xml.bind.annotation.XmlAccessorType;
021    import javax.xml.bind.annotation.XmlAttribute;
022    import javax.xml.bind.annotation.XmlRootElement;
023    import javax.xml.bind.annotation.XmlTransient;
024    
025    import org.apache.camel.Expression;
026    import org.apache.camel.Processor;
027    import org.apache.camel.processor.idempotent.IdempotentConsumer;
028    import org.apache.camel.spi.IdempotentRepository;
029    import org.apache.camel.spi.RouteContext;
030    import org.apache.camel.util.ObjectHelper;
031    
032    /**
033     * Represents an XML <idempotentConsumer/> element
034     */
035    @XmlRootElement(name = "idempotentConsumer")
036    @XmlAccessorType(XmlAccessType.FIELD)
037    public class IdempotentConsumerDefinition extends ExpressionNode {
038        @XmlAttribute
039        private String messageIdRepositoryRef;
040        @XmlAttribute
041        private Boolean eager;
042        @XmlAttribute
043        private Boolean skipDuplicate;
044        @XmlAttribute
045        private Boolean removeOnFailure;
046        @XmlTransient
047        private IdempotentRepository<?> idempotentRepository;
048    
049        public IdempotentConsumerDefinition() {
050        }
051    
052        public IdempotentConsumerDefinition(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
053            super(messageIdExpression);
054            this.idempotentRepository = idempotentRepository;
055        }
056    
057        @Override
058        public String toString() {
059            return "IdempotentConsumer[" + getExpression() + " -> " + getOutputs() + "]";
060        }
061    
062        @Override
063        public String getLabel() {
064            return "idempotentConsumer[" + getExpression() + "]";
065        }
066    
067        @Override
068        public String getShortName() {
069            return "idempotentConsumer";
070        }
071    
072        // Fluent API
073        //-------------------------------------------------------------------------
074    
075        /**
076         * Sets the reference name of the message id repository
077         *
078         * @param messageIdRepositoryRef the reference name of message id repository
079         * @return builder
080         */
081        public IdempotentConsumerDefinition messageIdRepositoryRef(String messageIdRepositoryRef) {
082            setMessageIdRepositoryRef(messageIdRepositoryRef);
083            return this;
084        }
085    
086        /**
087         * Sets the the message id repository for the idempotent consumer
088         *
089         * @param idempotentRepository the repository instance of idempotent
090         * @return builder
091         */
092        public IdempotentConsumerDefinition messageIdRepository(IdempotentRepository<?> idempotentRepository) {
093            setMessageIdRepository(idempotentRepository);
094            return this;
095        }
096    
097        /**
098         * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
099         * is complete. Eager is default enabled.
100         *
101         * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
102         *              the exchange is complete.
103         * @return builder
104         */
105        public IdempotentConsumerDefinition eager(boolean eager) {
106            setEager(eager);
107            return this;
108        }
109    
110        /**
111         * Sets whether to remove or keep the key on failure.
112         * <p/>
113         * The default behavior is to remove the key on failure.
114         *
115         * @param removeOnFailure <tt>true</tt> to remove the key, <tt>false</tt> to keep the key
116         *                        if the exchange fails.
117         * @return builder
118         */
119        public IdempotentConsumerDefinition removeOnFailure(boolean removeOnFailure) {
120            setRemoveOnFailure(removeOnFailure);
121            return this;
122        }
123    
124        /**
125         * Sets whether to skip duplicates or not.
126         * <p/>
127         * The default behavior is to skip duplicates.
128         * <p/>
129         * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
130         * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set.
131         *
132         * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates.
133         * @return builder
134         */
135        public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
136            setSkipDuplicate(skipDuplicate);
137            return this;
138        }
139    
140        public String getMessageIdRepositoryRef() {
141            return messageIdRepositoryRef;
142        }
143    
144        public void setMessageIdRepositoryRef(String messageIdRepositoryRef) {
145            this.messageIdRepositoryRef = messageIdRepositoryRef;
146        }
147    
148        public IdempotentRepository<?> getMessageIdRepository() {
149            return idempotentRepository;
150        }
151    
152        public void setMessageIdRepository(IdempotentRepository<?> idempotentRepository) {
153            this.idempotentRepository = idempotentRepository;
154        }
155    
156        public Boolean getEager() {
157            return eager;
158        }
159    
160        public void setEager(Boolean eager) {
161            this.eager = eager;
162        }
163    
164        public boolean isEager() {
165            // defaults to true if not configured
166            return eager != null ? eager : true;
167        }
168    
169        public Boolean getSkipDuplicate() {
170            return skipDuplicate;
171        }
172    
173        public void setSkipDuplicate(Boolean skipDuplicate) {
174            this.skipDuplicate = skipDuplicate;
175        }
176    
177        public boolean isSkipDuplicate() {
178            // defaults to true if not configured
179            return skipDuplicate != null ? skipDuplicate : true;
180        }
181    
182        public Boolean getRemoveOnFailure() {
183            return removeOnFailure;
184        }
185    
186        public void setRemoveOnFailure(Boolean removeOnFailure) {
187            this.removeOnFailure = removeOnFailure;
188        }
189    
190        public boolean isRemoveOnFailure() {
191            // defaults to true if not configured
192            return removeOnFailure != null ? removeOnFailure : true;
193        }
194    
195    
196        @Override
197        @SuppressWarnings("unchecked")
198        public Processor createProcessor(RouteContext routeContext) throws Exception {
199            Processor childProcessor = this.createChildProcessor(routeContext, true);
200    
201            IdempotentRepository<String> idempotentRepository =
202                    (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
203            ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
204    
205            // add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns
206            routeContext.getCamelContext().addService(idempotentRepository);
207    
208            Expression expression = getExpression().createExpression(routeContext);
209    
210            return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), isRemoveOnFailure(), childProcessor);
211        }
212    
213        /**
214         * Strategy method to resolve the {@link org.apache.camel.spi.IdempotentRepository} to use
215         *
216         * @param routeContext route context
217         * @return the repository
218         */
219        protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
220            if (messageIdRepositoryRef != null) {
221                idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
222            }
223            return idempotentRepository;
224        }
225    }