001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Iterator;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.Queue;
025    import java.util.concurrent.ConcurrentLinkedQueue;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.locks.Condition;
028    import java.util.concurrent.locks.Lock;
029    import java.util.concurrent.locks.ReentrantLock;
030    
031    import org.apache.camel.AsyncCallback;
032    import org.apache.camel.AsyncProcessor;
033    import org.apache.camel.CamelContext;
034    import org.apache.camel.CamelExchangeException;
035    import org.apache.camel.Exchange;
036    import org.apache.camel.Expression;
037    import org.apache.camel.Navigate;
038    import org.apache.camel.Predicate;
039    import org.apache.camel.Processor;
040    import org.apache.camel.impl.LoggingExceptionHandler;
041    import org.apache.camel.spi.ExceptionHandler;
042    import org.apache.camel.support.ServiceSupport;
043    import org.apache.camel.util.AsyncProcessorHelper;
044    import org.apache.camel.util.ObjectHelper;
045    import org.apache.camel.util.ServiceHelper;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A base class for any kind of {@link Processor} which implements some kind of batch processing.
051     * 
052     * @version 
053     * @deprecated may be removed in the future when we overhaul the resequencer EIP
054     */
055    @Deprecated
056    public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor> {
057    
058        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
059        public static final int DEFAULT_BATCH_SIZE = 100;
060    
061        private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class);
062    
063        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
064        private int batchSize = DEFAULT_BATCH_SIZE;
065        private int outBatchSize;
066        private boolean groupExchanges;
067        private boolean batchConsumer;
068        private boolean ignoreInvalidExchanges;
069        private Predicate completionPredicate;
070        private Expression expression;
071    
072        private final CamelContext camelContext;
073        private final Processor processor;
074        private final Collection<Exchange> collection;
075        private ExceptionHandler exceptionHandler;
076    
077        private final BatchSender sender;
078    
079        public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange> collection, Expression expression) {
080            ObjectHelper.notNull(camelContext, "camelContext");
081            ObjectHelper.notNull(processor, "processor");
082            ObjectHelper.notNull(collection, "collection");
083            ObjectHelper.notNull(expression, "expression");
084    
085            // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
086            this.camelContext = camelContext;
087            this.processor = processor;
088            this.collection = collection;
089            this.expression = expression;
090            this.sender = new BatchSender();
091            this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
092        }
093    
094        @Override
095        public String toString() {
096            return "BatchProcessor[to: " + processor + "]";
097        }
098    
099        // Properties
100        // -------------------------------------------------------------------------
101        public ExceptionHandler getExceptionHandler() {
102            return exceptionHandler;
103        }
104    
105        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
106            this.exceptionHandler = exceptionHandler;
107        }
108    
109        public int getBatchSize() {
110            return batchSize;
111        }
112    
113        /**
114         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
115         * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
116         * 
117         * @param batchSize the size
118         */
119        public void setBatchSize(int batchSize) {
120            // setting batch size to 0 or negative is like disabling it, so we set it as the max value
121            // as the code logic is dependent on a batch size having 1..n value
122            if (batchSize <= 0) {
123                LOG.debug("Disabling batch size, will only be triggered by timeout");
124                this.batchSize = Integer.MAX_VALUE;
125            } else {
126                this.batchSize = batchSize;
127            }
128        }
129    
130        public int getOutBatchSize() {
131            return outBatchSize;
132        }
133    
134        /**
135         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
136         * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
137         * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
138         * 
139         * @param outBatchSize the size
140         */
141        public void setOutBatchSize(int outBatchSize) {
142            this.outBatchSize = outBatchSize;
143        }
144    
145        public long getBatchTimeout() {
146            return batchTimeout;
147        }
148    
149        public void setBatchTimeout(long batchTimeout) {
150            this.batchTimeout = batchTimeout;
151        }
152    
153        public boolean isGroupExchanges() {
154            return groupExchanges;
155        }
156    
157        public void setGroupExchanges(boolean groupExchanges) {
158            this.groupExchanges = groupExchanges;
159        }
160    
161        public boolean isBatchConsumer() {
162            return batchConsumer;
163        }
164    
165        public void setBatchConsumer(boolean batchConsumer) {
166            this.batchConsumer = batchConsumer;
167        }
168    
169        public boolean isIgnoreInvalidExchanges() {
170            return ignoreInvalidExchanges;
171        }
172    
173        public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
174            this.ignoreInvalidExchanges = ignoreInvalidExchanges;
175        }
176    
177        public Predicate getCompletionPredicate() {
178            return completionPredicate;
179        }
180    
181        public void setCompletionPredicate(Predicate completionPredicate) {
182            this.completionPredicate = completionPredicate;
183        }
184    
185        public Processor getProcessor() {
186            return processor;
187        }
188    
189        public List<Processor> next() {
190            if (!hasNext()) {
191                return null;
192            }
193            List<Processor> answer = new ArrayList<Processor>(1);
194            answer.add(processor);
195            return answer;
196        }
197    
198        public boolean hasNext() {
199            return processor != null;
200        }
201    
202        /**
203         * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
204         * the in queue should be drained to the "out" collection.
205         */
206        private boolean isInBatchCompleted(int num) {
207            return num >= batchSize;
208        }
209    
210        /**
211         * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
212         * the out collection should be sent.
213         */
214        private boolean isOutBatchCompleted() {
215            if (outBatchSize == 0) {
216                // out batch is disabled, so go ahead and send.
217                return true;
218            }
219            return collection.size() > 0 && collection.size() >= outBatchSize;
220        }
221    
222        /**
223         * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
224         * custom processing before or after an individual exchange is processed
225         */
226        protected void processExchange(Exchange exchange) throws Exception {
227            processor.process(exchange);
228            if (exchange.getException() != null) {
229                getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
230            }
231        }
232    
233        protected void doStart() throws Exception {
234            ServiceHelper.startServices(processor);
235            sender.start();
236        }
237    
238        protected void doStop() throws Exception {
239            sender.cancel();
240            ServiceHelper.stopServices(processor);
241            collection.clear();
242        }
243    
244        public void process(Exchange exchange) throws Exception {
245            AsyncProcessorHelper.process(this, exchange);
246        }
247    
248        /**
249         * Enqueues an exchange for later batch processing.
250         */
251        public boolean process(Exchange exchange, AsyncCallback callback) {
252            try {
253                // if batch consumer is enabled then we need to adjust the batch size
254                // with the size from the batch consumer
255                if (isBatchConsumer()) {
256                    int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
257                    if (batchSize != size) {
258                        batchSize = size;
259                        LOG.trace("Using batch consumer completion, so setting batch size to: {}", batchSize);
260                    }
261                }
262    
263                // validate that the exchange can be used
264                if (!isValid(exchange)) {
265                    if (isIgnoreInvalidExchanges()) {
266                        LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange);
267                    } else {
268                        throw new CamelExchangeException("Exchange is not valid to be used by the BatchProcessor", exchange);
269                    }
270                } else {
271                    // exchange is valid so enqueue the exchange
272                    sender.enqueueExchange(exchange);
273                }
274            } catch (Throwable e) {
275                exchange.setException(e);
276            }
277            callback.done(true);
278            return true;
279        }
280    
281        /**
282         * Is the given exchange valid to be used.
283         *
284         * @param exchange the given exchange
285         * @return <tt>true</tt> if valid, <tt>false</tt> otherwise
286         */
287        private boolean isValid(Exchange exchange) {
288            Object result = null;
289            try {
290                result = expression.evaluate(exchange, Object.class);
291            } catch (Exception e) {
292                // ignore
293            }
294            return result != null;
295        }
296    
297        /**
298         * Sender thread for queued-up exchanges.
299         */
300        private class BatchSender extends Thread {
301    
302            private Queue<Exchange> queue;
303            private Lock queueLock = new ReentrantLock();
304            private boolean exchangeEnqueued;
305            private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>();
306            private Condition exchangeEnqueuedCondition = queueLock.newCondition();
307    
308            public BatchSender() {
309                super(camelContext.getExecutorServiceManager().resolveThreadName("Batch Sender"));
310                this.queue = new LinkedList<Exchange>();
311            }
312    
313            @Override
314            public void run() {
315                // Wait until one of either:
316                // * an exchange being queued;
317                // * the batch timeout expiring; or
318                // * the thread being cancelled.
319                //
320                // If an exchange is queued then we need to determine whether the
321                // batch is complete. If it is complete then we send out the batched
322                // exchanges. Otherwise we move back into our wait state.
323                //
324                // If the batch times out then we send out the batched exchanges
325                // collected so far.
326                //
327                // If we receive an interrupt then all blocking operations are
328                // interrupted and our thread terminates.
329                //
330                // The goal of the following algorithm in terms of synchronisation
331                // is to provide fine grained locking i.e. retaining the lock only
332                // when required. Special consideration is given to releasing the
333                // lock when calling an overloaded method i.e. sendExchanges. 
334                // Unlocking is important as the process of sending out the exchanges
335                // would otherwise block new exchanges from being queued.
336    
337                queueLock.lock();
338                try {
339                    do {
340                        try {
341                            if (!exchangeEnqueued) {
342                                LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after {} ms.", batchTimeout);
343                                exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
344                            }
345    
346                            // if the completion predicate was triggered then there is an exchange id which denotes when to complete
347                            String id = null;
348                            if (!completionPredicateMatched.isEmpty()) {
349                                id = completionPredicateMatched.poll();
350                            }
351    
352                            if (id != null || !exchangeEnqueued) {
353                                if (id != null) {
354                                    LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate");
355                                } else {
356                                    LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout");
357                                }
358                                drainQueueTo(collection, batchSize, id);
359                            } else {
360                                exchangeEnqueued = false;
361                                boolean drained = false;
362                                while (isInBatchCompleted(queue.size())) {
363                                    drained = true;
364                                    drainQueueTo(collection, batchSize, id);
365                                }
366                                if (drained) {
367                                    LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received");
368                                }
369    
370                                if (!isOutBatchCompleted()) {
371                                    continue;
372                                }
373                            }
374    
375                            queueLock.unlock();
376                            try {
377                                try {
378                                    sendExchanges();
379                                } catch (Throwable t) {
380                                    // a fail safe to handle all exceptions being thrown
381                                    getExceptionHandler().handleException(t);
382                                }
383                            } finally {
384                                queueLock.lock();
385                            }
386    
387                        } catch (InterruptedException e) {
388                            break;
389                        }
390    
391                    } while (isRunAllowed());
392    
393                } finally {
394                    queueLock.unlock();
395                }
396            }
397    
398            /**
399             * This method should be called with queueLock held
400             */
401            private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) {
402                for (int i = 0; i < batchSize; ++i) {
403                    Exchange e = queue.poll();
404                    if (e != null) {
405                        try {
406                            collection.add(e);
407                        } catch (Exception t) {
408                            e.setException(t);
409                        } catch (Throwable t) {
410                            getExceptionHandler().handleException(t);
411                        }
412                        if (exchangeId != null && exchangeId.equals(e.getExchangeId())) {
413                            // this batch is complete so stop draining
414                            break;
415                        }
416                    } else {
417                        break;
418                    }
419                }
420            }
421    
422            public void cancel() {
423                interrupt();
424            }
425    
426            public void enqueueExchange(Exchange exchange) {
427                LOG.debug("Received exchange to be batched: {}", exchange);
428                queueLock.lock();
429                try {
430                    // pre test whether the completion predicate matched
431                    if (completionPredicate != null) {
432                        boolean matches = completionPredicate.matches(exchange);
433                        if (matches) {
434                            LOG.trace("Exchange matched completion predicate: {}", exchange);
435                            // add this exchange to the list of exchanges which marks the batch as complete
436                            completionPredicateMatched.add(exchange.getExchangeId());
437                        }
438                    }
439                    queue.add(exchange);
440                    exchangeEnqueued = true;
441                    exchangeEnqueuedCondition.signal();
442                } finally {
443                    queueLock.unlock();
444                }
445            }
446            
447            private void sendExchanges() throws Exception {
448                Iterator<Exchange> iter = collection.iterator();
449                while (iter.hasNext()) {
450                    Exchange exchange = iter.next();
451                    iter.remove();
452                    try {
453                        LOG.debug("Sending aggregated exchange: {}", exchange);
454                        processExchange(exchange);
455                    } catch (Throwable t) {
456                        // must catch throwable to avoid growing memory
457                        getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
458                    }
459                }
460            }
461        }
462    
463    }