001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.Consumer;
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Service;
025    import org.apache.camel.util.ServiceHelper;
026    
027    /**
028     * A {@link org.apache.camel.spi.PollingConsumerPollStrategy} which supports suspending consumers if they
029     * failed for X number of times in a row.
030     * <p/>
031     * If Camel cannot successfully consumer from a given consumer, then after X consecutive failed attempts the consumer
032     * will be suspended/stopped. This prevents the log to get flooded with failed attempts, for example during nightly runs.
033     *
034     * @version 
035     */
036    public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy implements Service {
037    
038        private final Map<Consumer, Integer> state = new HashMap<Consumer, Integer>();
039        private int limit = 3;
040    
041        public int getLimit() {
042            return limit;
043        }
044    
045        /**
046         * Sets the limit for how many straight rollbacks causes this strategy to suspend the fault consumer.
047         * <p/>
048         * When the consumer has been suspended, it has to be manually resumed/started to be active again.
049         * The limit is by default 3.
050         *
051         * @param limit  the limit
052         */
053        public void setLimit(int limit) {
054            this.limit = limit;
055        }
056    
057        @Override
058        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
059            // we could commit so clear state
060            state.remove(consumer);
061        }
062    
063        public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
064            // keep track how many times in a row we have rolled back
065            Integer times = state.get(consumer);
066            if (times == null) {
067                times = 1;
068            } else {
069                times += 1;
070            }
071            log.debug("Rollback occurred after {} times when consuming {}", times, endpoint);
072    
073            boolean retry = false;
074    
075            if (times >= limit) {
076                // clear state when we suspend so if its restarted manually we start all over again
077                state.remove(consumer);
078                onSuspend(consumer, endpoint);
079            } else {
080                // error occurred
081                state.put(consumer, times);
082                retry = onRollback(consumer, endpoint);
083            }
084    
085            return retry;
086        }
087    
088        /**
089         * The consumer is to be suspended because it exceeded the limit
090         *
091         * @param consumer the consumer
092         * @param endpoint the endpoint
093         * @throws Exception is thrown if error suspending the consumer
094         */
095        protected void onSuspend(Consumer consumer, Endpoint endpoint) throws Exception {
096            log.warn("Suspending consumer " + consumer + " after " + limit + " attempts to consume from " + endpoint
097                    + ". You have to manually resume the consumer!");
098            ServiceHelper.suspendService(consumer);
099        }
100    
101        /**
102         * Rollback occurred.
103         *
104         * @param consumer the consumer
105         * @param endpoint the endpoint
106         * @return whether or not to retry immediately, is default <tt>false</tt>
107         * @throws Exception can be thrown in case something goes wrong
108         */
109        protected boolean onRollback(Consumer consumer, Endpoint endpoint) throws Exception {
110            // do not retry by default
111            return false;
112        }
113    
114        public void start() throws Exception {
115            // noop
116        }
117    
118        public void stop() throws Exception {
119            state.clear();
120        }
121    }