001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    
022    import org.apache.camel.CamelContext;
023    import org.apache.camel.Component;
024    import org.apache.camel.PollingConsumer;
025    import org.apache.camel.ResolveEndpointFailedException;
026    import org.apache.camel.util.EndpointHelper;
027    import org.apache.camel.util.IntrospectionSupport;
028    
029    /**
030     * A base class for {@link org.apache.camel.Endpoint} which creates a {@link ScheduledPollConsumer}
031     *
032     * @version 
033     */
034    public abstract class ScheduledPollEndpoint extends DefaultEndpoint {
035    
036        private static final String SPRING_SCHEDULER = "org.apache.camel.spring.pollingconsumer.SpringScheduledPollConsumerScheduler";
037        private static final String QUARTZ_2_SCHEDULER = "org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler";
038    
039        protected ScheduledPollEndpoint(String endpointUri, Component component) {
040            super(endpointUri, component);
041        }
042    
043        @Deprecated
044        protected ScheduledPollEndpoint(String endpointUri, CamelContext context) {
045            super(endpointUri, context);
046        }
047    
048        @Deprecated
049        protected ScheduledPollEndpoint(String endpointUri) {
050            super(endpointUri);
051        }
052    
053        protected ScheduledPollEndpoint() {
054        }
055    
056        public void configureProperties(Map<String, Object> options) {
057            super.configureProperties(options);
058            configureScheduledPollConsumerProperties(options, getConsumerProperties());
059        }
060    
061        protected void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) {
062            // special for scheduled poll consumers as we want to allow end users to configure its options
063            // from the URI parameters without the consumer. prefix
064            Map<String, Object> schedulerProperties = IntrospectionSupport.extractProperties(options, "scheduler.");
065            Object startScheduler = options.remove("startScheduler");
066            Object initialDelay = options.remove("initialDelay");
067            Object delay = options.remove("delay");
068            Object timeUnit = options.remove("timeUnit");
069            Object useFixedDelay = options.remove("useFixedDelay");
070            Object pollStrategy = options.remove("pollStrategy");
071            Object runLoggingLevel = options.remove("runLoggingLevel");
072            Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle");
073            Object greedy = options.remove("greedy");
074            Object scheduledExecutorService  = options.remove("scheduledExecutorService");
075            Object scheduler  = options.remove("scheduler");
076            Object backoffMultiplier  = options.remove("backoffMultiplier");
077            Object backoffIdleThreshold  = options.remove("backoffIdleThreshold");
078            Object backoffErrorThreshold  = options.remove("backoffErrorThreshold");
079            boolean setConsumerProperties = false;
080    
081            // the following is split into two if statements to satisfy the checkstyle max complexity constraint
082            if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
083                setConsumerProperties = true;
084            }
085            if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) {
086                setConsumerProperties = true;
087            }
088            if (scheduler != null || !schedulerProperties.isEmpty() || backoffMultiplier != null || backoffIdleThreshold != null || backoffErrorThreshold != null) {
089                setConsumerProperties = true;
090            }
091    
092            if (setConsumerProperties) {
093    
094                if (consumerProperties == null) {
095                    consumerProperties = new HashMap<String, Object>();
096                }
097                if (initialDelay != null) {
098                    consumerProperties.put("initialDelay", initialDelay);
099                }
100                if (startScheduler != null) {
101                    consumerProperties.put("startScheduler", startScheduler);
102                }
103                if (delay != null) {
104                    consumerProperties.put("delay", delay);
105                }
106                if (timeUnit != null) {
107                    consumerProperties.put("timeUnit", timeUnit);
108                }
109                if (useFixedDelay != null) {
110                    consumerProperties.put("useFixedDelay", useFixedDelay);
111                }
112                if (pollStrategy != null) {
113                    consumerProperties.put("pollStrategy", pollStrategy);
114                }
115                if (runLoggingLevel != null) {
116                    consumerProperties.put("runLoggingLevel", runLoggingLevel);
117                }
118                if (sendEmptyMessageWhenIdle != null) {
119                    consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle);
120                }
121                if (greedy != null) {
122                    consumerProperties.put("greedy", greedy);
123                }
124                if (scheduledExecutorService != null) {
125                    consumerProperties.put("scheduledExecutorService", scheduledExecutorService);
126                }
127                if (scheduler != null) {
128                    // special for scheduler if its "spring"
129                    if ("spring".equals(scheduler)) {
130                        try {
131                            Class<?> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(SPRING_SCHEDULER);
132                            scheduler = getCamelContext().getInjector().newInstance(clazz);
133                        } catch (ClassNotFoundException e) {
134                            throw new IllegalArgumentException("Cannot load " + SPRING_SCHEDULER + " from classpath. Make sure camel-spring.jar is on the classpath.", e);
135                        }
136                    } else if ("quartz2".equals(scheduler)) {
137                        try {
138                            Class<?> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(QUARTZ_2_SCHEDULER);
139                            scheduler = getCamelContext().getInjector().newInstance(clazz);
140                        } catch (ClassNotFoundException e) {
141                            throw new IllegalArgumentException("Cannot load " + QUARTZ_2_SCHEDULER + " from classpath. Make sure camel-quarz2.jar is on the classpath.", e);
142                        }
143                    }
144                    consumerProperties.put("scheduler", scheduler);
145                }
146                if (!schedulerProperties.isEmpty()) {
147                    consumerProperties.put("schedulerProperties", schedulerProperties);
148                }
149                if (backoffMultiplier != null) {
150                    consumerProperties.put("backoffMultiplier", backoffMultiplier);
151                }
152                if (backoffIdleThreshold != null) {
153                    consumerProperties.put("backoffIdleThreshold", backoffIdleThreshold);
154                }
155                if (backoffErrorThreshold != null) {
156                    consumerProperties.put("backoffErrorThreshold", backoffErrorThreshold);
157                }
158            }
159        }
160    
161        @Override
162        protected void configurePollingConsumer(PollingConsumer consumer) throws Exception {
163            Map<String, Object> copy = new HashMap<String, Object>(getConsumerProperties());
164            Map<String, Object> throwaway = new HashMap<String, Object>();
165    
166            // filter out unwanted options which is intended for the scheduled poll consumer
167            // as these options are not supported on the polling consumer
168            configureScheduledPollConsumerProperties(copy, throwaway);
169    
170            // set reference properties first as they use # syntax that fools the regular properties setter
171            EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy);
172            EndpointHelper.setProperties(getCamelContext(), consumer, copy);
173    
174            if (!isLenientProperties() && copy.size() > 0) {
175                throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
176                        + " parameters that couldn't be set on the endpoint polling consumer."
177                        + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
178                        + " Unknown consumer parameters=[" + copy + "]");
179            }
180        }
181    
182    }