001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.Method;
020    import java.util.Set;
021    import javax.xml.bind.annotation.XmlTransient;
022    
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.CamelContextAware;
025    import org.apache.camel.Consume;
026    import org.apache.camel.Consumer;
027    import org.apache.camel.ConsumerTemplate;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.IsSingleton;
030    import org.apache.camel.NoSuchBeanException;
031    import org.apache.camel.PollingConsumer;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Producer;
034    import org.apache.camel.ProducerTemplate;
035    import org.apache.camel.ProxyInstantiationException;
036    import org.apache.camel.Service;
037    import org.apache.camel.component.bean.BeanInfo;
038    import org.apache.camel.component.bean.BeanProcessor;
039    import org.apache.camel.component.bean.ProxyHelper;
040    import org.apache.camel.processor.CamelInternalProcessor;
041    import org.apache.camel.processor.UnitOfWorkProducer;
042    import org.apache.camel.util.CamelContextHelper;
043    import org.apache.camel.util.IntrospectionSupport;
044    import org.apache.camel.util.ObjectHelper;
045    import org.apache.camel.util.ServiceHelper;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A helper class for Camel based injector or post processing hooks which can be reused by
051     * both the <a href="http://camel.apache.org/spring.html">Spring</a>,
052     * <a href="http://camel.apache.org/guice.html">Guice</a> and
053     * <a href="http://camel.apache.org/blueprint.html">Blueprint</a> support.
054     *
055     * @version 
056     */
057    public class CamelPostProcessorHelper implements CamelContextAware {
058        private static final Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class);
059    
060        @XmlTransient
061        private CamelContext camelContext;
062    
063        public CamelPostProcessorHelper() {
064        }
065    
066        public CamelPostProcessorHelper(CamelContext camelContext) {
067            this.setCamelContext(camelContext);
068        }
069    
070        public CamelContext getCamelContext() {
071            return camelContext;
072        }
073    
074        public void setCamelContext(CamelContext camelContext) {
075            this.camelContext = camelContext;
076        }
077    
078        /**
079         * Does the given context match this camel context
080         */
081        public boolean matchContext(String context) {
082            if (ObjectHelper.isNotEmpty(context)) {
083                if (!getCamelContext().getName().equals(context)) {
084                    return false;
085                }
086            }
087            return true;
088        }
089    
090        public void consumerInjection(Method method, Object bean, String beanName) {
091            Consume consume = method.getAnnotation(Consume.class);
092            if (consume != null && matchContext(consume.context())) {
093                LOG.debug("Creating a consumer for: " + consume);
094                subscribeMethod(method, bean, beanName, consume.uri(), consume.ref(), consume.property());
095            }
096        }
097    
098        public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName, String endpointProperty) {
099            // lets bind this method to a listener
100            String injectionPointName = method.getName();
101            Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointName, endpointProperty, injectionPointName, true);
102            if (endpoint != null) {
103                try {
104                    Processor processor = createConsumerProcessor(bean, method, endpoint);
105                    Consumer consumer = endpoint.createConsumer(processor);
106                    LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
107                    startService(consumer, bean, beanName);
108                } catch (Exception e) {
109                    throw ObjectHelper.wrapRuntimeCamelException(e);
110                }
111            }
112        }
113    
114        /**
115         * Stats the given service
116         */
117        protected void startService(Service service, Object bean, String beanName) throws Exception {
118            if (isSingleton(bean, beanName)) {
119                getCamelContext().addService(service);
120            } else {
121                LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
122                ServiceHelper.startService(service);
123            }
124        }
125    
126        /**
127         * Create a processor which invokes the given method when an incoming
128         * message exchange is received
129         */
130        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
131            BeanInfo info = new BeanInfo(getCamelContext(), method);
132            BeanProcessor answer = new BeanProcessor(pojo, info);
133            // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
134            CamelInternalProcessor internal = new CamelInternalProcessor(answer);
135            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
136            return internal;
137        }
138    
139        public Endpoint getEndpointInjection(Object bean, String uri, String name, String propertyName,
140                                             String injectionPointName, boolean mandatory) {
141            if (ObjectHelper.isEmpty(uri) && ObjectHelper.isEmpty(name)) {
142                // if no uri or ref, then fallback and try the endpoint property
143                return doGetEndpointInjection(bean, propertyName, injectionPointName);
144            } else {
145                return doGetEndpointInjection(uri, name, injectionPointName, mandatory);
146            }
147        }
148    
149        private Endpoint doGetEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
150            return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
151        }
152    
153        /**
154         * Gets the injection endpoint from a bean property.
155         * @param bean the bean
156         * @param propertyName the property name on the bean
157         */
158        private Endpoint doGetEndpointInjection(Object bean, String propertyName, String injectionPointName) {
159            // fall back and use the method name if no explicit property name was given
160            if (ObjectHelper.isEmpty(propertyName)) {
161                propertyName = injectionPointName;
162            }
163    
164            // we have a property name, try to lookup a getter method on the bean with that name using this strategy
165            // 1. first the getter with the name as given
166            // 2. then the getter with Endpoint as postfix
167            // 3. then if start with on then try step 1 and 2 again, but omit the on prefix
168            try {
169                Object value = IntrospectionSupport.getOrElseProperty(bean, propertyName, null);
170                if (value == null) {
171                    // try endpoint as postfix
172                    value = IntrospectionSupport.getOrElseProperty(bean, propertyName + "Endpoint", null);
173                }
174                if (value == null && propertyName.startsWith("on")) {
175                    // retry but without the on as prefix
176                    propertyName = propertyName.substring(2);
177                    return doGetEndpointInjection(bean, propertyName, injectionPointName);
178                }
179                if (value == null) {
180                    return null;
181                } else if (value instanceof Endpoint) {
182                    return (Endpoint) value;
183                } else {
184                    String uriOrRef = getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, value);
185                    return getCamelContext().getEndpoint(uriOrRef);
186                }
187            } catch (Exception e) {
188                throw new IllegalArgumentException("Error getting property " + propertyName + " from bean " + bean + " due " + e.getMessage(), e);
189            }
190        }
191    
192        /**
193         * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
194         */
195        public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String endpointProperty,
196                                        String injectionPointName, Object bean, String beanName) {
197            if (type.isAssignableFrom(ProducerTemplate.class)) {
198                return createInjectionProducerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName, bean);
199            } else if (type.isAssignableFrom(ConsumerTemplate.class)) {
200                return createInjectionConsumerTemplate(endpointUri, endpointRef, endpointProperty, injectionPointName);
201            } else {
202                Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, true);
203                if (endpoint != null) {
204                    if (type.isInstance(endpoint)) {
205                        return endpoint;
206                    } else if (type.isAssignableFrom(Producer.class)) {
207                        return createInjectionProducer(endpoint, bean, beanName);
208                    } else if (type.isAssignableFrom(PollingConsumer.class)) {
209                        return createInjectionPollingConsumer(endpoint, bean, beanName);
210                    } else if (type.isInterface()) {
211                        // lets create a proxy
212                        try {
213                            return ProxyHelper.createProxy(endpoint, type);
214                        } catch (Exception e) {
215                            throw createProxyInstantiationRuntimeException(type, endpoint, e);
216                        }
217                    } else {
218                        throw new IllegalArgumentException("Invalid type: " + type.getName()
219                                + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
220                    }
221                }
222                return null;
223            }
224        }
225    
226        public Object getInjectionPropertyValue(Class<?> type, String propertyName, String propertyDefaultValue,
227                                                String injectionPointName, Object bean, String beanName) {
228            try {
229                String key;
230                String prefix = getCamelContext().getPropertyPrefixToken();
231                String suffix = getCamelContext().getPropertySuffixToken();
232                if (!propertyName.contains(prefix)) {
233                    // must enclose the property name with prefix/suffix to have it resolved
234                    key = prefix + propertyName + suffix;
235                } else {
236                    // key has already prefix/suffix so use it as-is as it may be a compound key
237                    key = propertyName;
238                }
239                String value = getCamelContext().resolvePropertyPlaceholders(key);
240                if (value != null) {
241                    return getCamelContext().getTypeConverter().mandatoryConvertTo(type, value);
242                } else {
243                    return null;
244                }
245            } catch (Exception e) {
246                if (ObjectHelper.isNotEmpty(propertyDefaultValue)) {
247                    try {
248                        return getCamelContext().getTypeConverter().mandatoryConvertTo(type, propertyDefaultValue);
249                    } catch (Exception e2) {
250                        throw ObjectHelper.wrapRuntimeCamelException(e2);
251                    }
252                }
253                throw ObjectHelper.wrapRuntimeCamelException(e);
254            }
255        }
256    
257        public Object getInjectionBeanValue(Class<?> type, String name) {
258            if (ObjectHelper.isEmpty(name)) {
259                Set<?> found = getCamelContext().getRegistry().findByType(type);
260                if (found == null || found.isEmpty()) {
261                    throw new NoSuchBeanException(name, type.getName());
262                } else if (found.size() > 1) {
263                    throw new NoSuchBeanException("Found " + found.size() + " beans of type: " + type + ". Only one bean expected.");
264                } else {
265                    // we found only one
266                    return found.iterator().next();
267                }
268            } else {
269                return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type);
270            }
271        }
272    
273        /**
274         * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
275         */
276        protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String endpointProperty,
277                                                                   String injectionPointName, Object bean) {
278            // endpoint is optional for this injection point
279            Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false);
280            ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
281            // start the template so its ready to use
282            try {
283                answer.start();
284            } catch (Exception e) {
285                throw ObjectHelper.wrapRuntimeCamelException(e);
286            }
287            return answer;
288        }
289    
290        /**
291         * Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
292         */
293        protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String endpointProperty,
294                                                                   String injectionPointName) {
295            ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
296            // start the template so its ready to use
297            try {
298                answer.start();
299            } catch (Exception e) {
300                throw ObjectHelper.wrapRuntimeCamelException(e);
301            }
302            return answer;
303        }
304    
305        /**
306         * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
307         */
308        protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) {
309            try {
310                PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
311                startService(pollingConsumer, bean, beanName);
312                return pollingConsumer;
313            } catch (Exception e) {
314                throw ObjectHelper.wrapRuntimeCamelException(e);
315            }
316        }
317    
318        /**
319         * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
320         */
321        protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
322            try {
323                Producer producer = endpoint.createProducer();
324                startService(producer, bean, beanName);
325                return new UnitOfWorkProducer(producer);
326            } catch (Exception e) {
327                throw ObjectHelper.wrapRuntimeCamelException(e);
328            }
329        }
330    
331        protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
332            return new ProxyInstantiationException(type, endpoint, e);
333        }
334    
335        /**
336         * Implementations can override this method to determine if the bean is singleton.
337         *
338         * @param bean the bean
339         * @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned.
340         */
341        protected boolean isSingleton(Object bean, String beanName) {
342            if (bean instanceof IsSingleton) {
343                IsSingleton singleton = (IsSingleton) bean;
344                return singleton.isSingleton();
345            }
346            return true;
347        }
348    }