001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.bean;
018    
019    import java.lang.reflect.InvocationHandler;
020    import java.lang.reflect.Method;
021    import java.lang.reflect.Type;
022    import java.util.ArrayList;
023    import java.util.Arrays;
024    import java.util.List;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.FutureTask;
030    
031    import org.apache.camel.CamelContext;
032    import org.apache.camel.CamelExchangeException;
033    import org.apache.camel.Endpoint;
034    import org.apache.camel.Exchange;
035    import org.apache.camel.ExchangePattern;
036    import org.apache.camel.InvalidPayloadException;
037    import org.apache.camel.Producer;
038    import org.apache.camel.RuntimeCamelException;
039    import org.apache.camel.impl.DefaultExchange;
040    import org.apache.camel.util.ObjectHelper;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    public abstract class AbstractCamelInvocationHandler implements InvocationHandler {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class);
047        private static final List<Method> EXCLUDED_METHODS = new ArrayList<Method>();
048        private static ExecutorService executorService;
049        protected final Endpoint endpoint;
050        protected final Producer producer;
051    
052        static {
053            // exclude all java.lang.Object methods as we dont want to invoke them
054            EXCLUDED_METHODS.addAll(Arrays.asList(Object.class.getMethods()));
055        }
056    
057        public AbstractCamelInvocationHandler(Endpoint endpoint, Producer producer) {
058            this.endpoint = endpoint;
059            this.producer = producer;
060        }
061    
062        private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException {
063            // get the body from the Exchange from either OUT or IN
064            if (exchange.hasOut()) {
065                if (exchange.getOut().getBody() != null) {
066                    return exchange.getOut().getMandatoryBody(type);
067                } else {
068                    return null;
069                }
070            } else {
071                if (exchange.getIn().getBody() != null) {
072                    return exchange.getIn().getMandatoryBody(type);
073                } else {
074                    return null;
075                }
076            }
077        }
078    
079        @Override
080        public final Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
081            if (isValidMethod(method)) {
082                return doInvokeProxy(proxy, method, args);
083            } else {
084                // invalid method then invoke methods on this instead
085                if ("toString".equals(method.getName())) {
086                    return this.toString();
087                } else if ("hashCode".equals(method.getName())) {
088                    return this.hashCode();
089                } else if ("equals".equals(method.getName())) {
090                    return Boolean.FALSE;
091                }
092                return null;
093            }
094        }
095    
096        public abstract Object doInvokeProxy(final Object proxy, final Method method, final Object[] args) throws Throwable;
097    
098        protected Object invokeWithBody(final Method method, Object body, final ExchangePattern pattern) throws Throwable {
099            final Exchange exchange = new DefaultExchange(endpoint, pattern);
100            exchange.getIn().setBody(body);
101    
102            // is the return type a future
103            final boolean isFuture = method.getReturnType() == Future.class;
104    
105            // create task to execute the proxy and gather the reply
106            FutureTask<Object> task = new FutureTask<Object>(new Callable<Object>() {
107                public Object call() throws Exception {
108                    // process the exchange
109                    LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
110                    producer.process(exchange);
111    
112                    Object answer = afterInvoke(method, exchange, pattern, isFuture);
113                    LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
114                    return answer;
115                }
116            });
117    
118            if (isFuture) {
119                // submit task and return future
120                if (LOG.isTraceEnabled()) {
121                    LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId());
122                }
123                getExecutorService(exchange.getContext()).submit(task);
124                return task;
125            } else {
126                // execute task now
127                try {
128                    task.run();
129                    return task.get();
130                } catch (ExecutionException e) {
131                    // we don't want the wrapped exception from JDK
132                    throw e.getCause();
133                }
134            }
135        }
136    
137        protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception {
138            // check if we had an exception
139            Throwable cause = exchange.getException();
140            if (cause != null) {
141                Throwable found = findSuitableException(cause, method);
142                if (found != null) {
143                    if (found instanceof Exception) {
144                        throw (Exception)found;
145                    } else {
146                        // wrap as exception
147                        throw new CamelExchangeException("Error processing exchange", exchange, cause);
148                    }
149                }
150                // special for runtime camel exceptions as they can be nested
151                if (cause instanceof RuntimeCamelException) {
152                    // if the inner cause is a runtime exception we can throw it
153                    // directly
154                    if (cause.getCause() instanceof RuntimeException) {
155                        throw (RuntimeException)((RuntimeCamelException)cause).getCause();
156                    }
157                    throw (RuntimeCamelException)cause;
158                }
159                // okay just throw the exception as is
160                if (cause instanceof Exception) {
161                    throw (Exception)cause;
162                } else {
163                    // wrap as exception
164                    throw new CamelExchangeException("Error processing exchange", exchange, cause);
165                }
166            }
167    
168            Class<?> to = isFuture ? getGenericType(exchange.getContext(), method.getGenericReturnType()) : method.getReturnType();
169    
170            // do not return a reply if the method is VOID
171            if (to == Void.TYPE) {
172                return null;
173            }
174    
175            return getBody(exchange, to);
176        }
177    
178        protected static Class<?> getGenericType(CamelContext context, Type type) throws ClassNotFoundException {
179            if (type == null) {
180                // fallback and use object
181                return Object.class;
182            }
183    
184            // unfortunately java dont provide a nice api for getting the generic
185            // type of the return type
186            // due type erasure, so we have to gather it based on a String
187            // representation
188            String name = ObjectHelper.between(type.toString(), "<", ">");
189            if (name != null) {
190                if (name.contains("<")) {
191                    // we only need the outer type
192                    name = ObjectHelper.before(name, "<");
193                }
194                return context.getClassResolver().resolveMandatoryClass(name);
195            } else {
196                // fallback and use object
197                return Object.class;
198            }
199        }
200    
201        @SuppressWarnings("deprecation")
202        protected static synchronized ExecutorService getExecutorService(CamelContext context) {
203            // CamelContext will shutdown thread pool when it shutdown so we can
204            // lazy create it on demand
205            // but in case of hot-deploy or the likes we need to be able to
206            // re-create it (its a shared static instance)
207            if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
208                // try to lookup a pool first based on id/profile
209                executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler");
210                if (executorService == null) {
211                    executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler");
212                }
213            }
214            return executorService;
215        }
216    
217        /**
218         * Tries to find the best suited exception to throw.
219         * <p/>
220         * It looks in the exception hierarchy from the caused exception and matches
221         * this against the declared exceptions being thrown on the method.
222         * 
223         * @param cause the caused exception
224         * @param method the method
225         * @return the exception to throw, or <tt>null</tt> if not possible to find
226         *         a suitable exception
227         */
228        protected Throwable findSuitableException(Throwable cause, Method method) {
229            if (method.getExceptionTypes() == null || method.getExceptionTypes().length == 0) {
230                return null;
231            }
232    
233            // see if there is any exception which matches the declared exception on
234            // the method
235            for (Class<?> type : method.getExceptionTypes()) {
236                Object fault = ObjectHelper.getException(type, cause);
237                if (fault != null) {
238                    return Throwable.class.cast(fault);
239                }
240            }
241    
242            return null;
243        }
244    
245        protected boolean isValidMethod(Method method) {
246            // must not be in the excluded list
247            for (Method excluded : EXCLUDED_METHODS) {
248                if (ObjectHelper.isOverridingMethod(excluded, method)) {
249                    // the method is overriding an excluded method so its not valid
250                    return false;
251                }
252            }
253            return true;
254        }
255    
256    }