001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.resequencer;
018    
019    import java.util.Timer;
020    
021    import org.apache.camel.util.concurrent.ThreadHelper;
022    
023    /**
024     * Resequences elements based on a given {@link SequenceElementComparator}.
025     * This resequencer is designed for resequencing element streams. Stream-based
026     * resequencing has the advantage that the number of elements to be resequenced
027     * need not be known in advance. Resequenced elements are delivered via a
028     * {@link SequenceSender}.
029     * <p>
030     * The resequencer's behaviour for a given comparator is controlled by the
031     * <code>timeout</code> property. This is the timeout (in milliseconds) for a
032     * given element managed by this resequencer. An out-of-sequence element can
033     * only be marked as <i>ready-for-delivery</i> if it either times out or if it
034     * has an immediate predecessor (in that case it is in-sequence). If an
035     * immediate predecessor of a waiting element arrives the timeout task for the
036     * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
037     * <p>
038     * If the maximum out-of-sequence time difference between elements within a
039     * stream is known, the <code>timeout</code> value should be set to this
040     * value. In this case it is guaranteed that all elements of a stream will be
041     * delivered in sequence via the {@link SequenceSender}. The lower the
042     * <code>timeout</code> value is compared to the out-of-sequence time
043     * difference between elements within a stream the higher the probability is for
044     * out-of-sequence elements delivered by this resequencer. Delivery of elements
045     * must be explicitly triggered by applications using the {@link #deliver()} or
046     * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i>
047     * are delivered by these methods. The longer an application waits to trigger a
048     * delivery the more elements may become <i>ready-for-delivery</i>.
049     * <p>
050     * The resequencer remembers the last-delivered element. If an element arrives
051     * which is the immediate successor of the last-delivered element it is
052     * <i>ready-for-delivery</i> immediately. After delivery the last-delivered
053     * element is adjusted accordingly. If the last-delivered element is
054     * <code>null</code> i.e. the resequencer was newly created the first arriving
055     * element needs <code>timeout</code> milliseconds in any case for becoming
056     * <i>ready-for-delivery</i>.
057     * <p>
058     *
059     * @version 
060     */
061    public class ResequencerEngine<E> {
062    
063        /**
064         * The element that most recently hash been delivered or <code>null</code>
065         * if no element has been delivered yet.
066         */
067        private Element<E> lastDelivered;
068    
069        /**
070         * Minimum amount of time to wait for out-of-sequence elements.
071         */
072        private long timeout;
073    
074        /**
075         * A sequence of elements for sorting purposes.
076         */
077        private Sequence<Element<E>> sequence;
078    
079        /**
080         * A timer for scheduling timeout notifications.
081         */
082        private Timer timer;
083    
084        /**
085         * A strategy for sending sequence elements.
086         */
087        private SequenceSender<E> sequenceSender;
088    
089        /**
090         * Indicates whether an error should be thrown if message older (based on Comparator) than the last delivered message is received.
091         */
092        private Boolean rejectOld;
093    
094        /**
095         * Creates a new resequencer instance with a default timeout of 2000
096         * milliseconds.
097         *
098         * @param comparator a sequence element comparator.
099         */
100        public ResequencerEngine(SequenceElementComparator<E> comparator) {
101            this.sequence = createSequence(comparator);
102            this.timeout = 2000L;
103            this.lastDelivered = null;
104        }
105    
106        public void start() {
107            timer = new Timer(ThreadHelper.resolveThreadName("Camel Thread ${counter} - ${name}", "Stream Resequencer Timer"), true);
108        }
109    
110        /**
111         * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
112         */
113        public void stop() {
114            timer.cancel();
115        }
116    
117        /**
118         * Returns the number of elements currently maintained by this resequencer.
119         *
120         * @return the number of elements currently maintained by this resequencer.
121         */
122        public synchronized int size() {
123            return sequence.size();
124        }
125    
126        /**
127         * Returns this resequencer's timeout value.
128         *
129         * @return the timeout in milliseconds.
130         */
131        public long getTimeout() {
132            return timeout;
133        }
134    
135        /**
136         * Sets this sequencer's timeout value.
137         *
138         * @param timeout the timeout in milliseconds.
139         */
140        public void setTimeout(long timeout) {
141            this.timeout = timeout;
142        }
143    
144        public Boolean getRejectOld() {
145            return rejectOld;
146        }
147    
148        public void setRejectOld(Boolean rejectOld) {
149            this.rejectOld = rejectOld;
150        }
151    
152        /**
153         * Returns the sequence sender.
154         *
155         * @return the sequence sender.
156         */
157        public SequenceSender<E> getSequenceSender() {
158            return sequenceSender;
159        }
160    
161        /**
162         * Sets the sequence sender.
163         *
164         * @param sequenceSender a sequence element sender.
165         */
166        public void setSequenceSender(SequenceSender<E> sequenceSender) {
167            this.sequenceSender = sequenceSender;
168        }
169    
170        /**
171         * Returns the last delivered element.
172         *
173         * @return the last delivered element or <code>null</code> if no delivery
174         *         has been made yet.
175         */
176        E getLastDelivered() {
177            if (lastDelivered == null) {
178                return null;
179            }
180            return lastDelivered.getObject();
181        }
182    
183        /**
184         * Sets the last delivered element. This is for testing purposes only.
185         *
186         * @param o an element.
187         */
188        void setLastDelivered(E o) {
189            lastDelivered = new Element<E>(o);
190        }
191    
192        /**
193         * Inserts the given element into this resequencer. If the element is not
194         * ready for immediate delivery and has no immediate presecessor then it is
195         * scheduled for timing out. After being timed out it is ready for delivery.
196         *
197         * @param o an element.
198         * @throws IllegalArgumentException if the element cannot be used with this resequencer engine
199         */
200        public synchronized void insert(E o) {
201            // wrap object into internal element
202            Element<E> element = new Element<E>(o);
203    
204            // validate the exchange has no problem
205            if (!sequence.comparator().isValid(element)) {
206                throw new IllegalArgumentException("Element cannot be used in comparator: " + sequence.comparator());
207            }
208    
209            // validate the exchange shouldn't be 'rejected' (if applicable)
210            if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element)) {
211                throw new MessageRejectedException("rejecting message [" + element.getObject()
212                        + "], it should have been sent before the last delivered message [" + lastDelivered.getObject() + "]");
213            }
214    
215            // add element to sequence in proper order
216            sequence.add(element);
217    
218            Element<E> successor = sequence.successor(element);
219    
220            // check if there is an immediate successor and cancel
221            // timer task (no need to wait any more for timeout)
222            if (successor != null) {
223                successor.cancel();
224            }
225    
226            // start delivery if current element is successor of last delivered element
227            if (successorOfLastDelivered(element)) {
228                // nothing to schedule
229            } else if (sequence.predecessor(element) != null) {
230                // nothing to schedule
231            } else {
232                element.schedule(defineTimeout());
233            }
234        }
235    
236        /**
237         * Delivers all elements which are currently ready to deliver.
238         *
239         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
240         *
241         * @see ResequencerEngine#deliverNext() 
242         */
243        public synchronized void deliver() throws Exception {
244            while (deliverNext()) {
245                // do nothing here
246            }
247        }
248    
249        /**
250         * Attempts to deliver a single element from the head of the resequencer
251         * queue (sequence). Only elements which have not been scheduled for timing
252         * out or which already timed out can be delivered. Elements are delivered via
253         * {@link SequenceSender#sendElement(Object)}.
254         *
255         * @return <code>true</code> if the element has been delivered
256         *         <code>false</code> otherwise.
257         *
258         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
259         *
260         */
261        public boolean deliverNext() throws Exception {
262            if (sequence.size() == 0) {
263                return false;
264            }
265            // inspect element with lowest sequence value
266            Element<E> element = sequence.first();
267    
268            // if element is scheduled do not deliver and return
269            if (element.scheduled()) {
270                return false;
271            }
272    
273            // remove deliverable element from sequence
274            sequence.remove(element);
275    
276            // set the delivered element to last delivered element
277            lastDelivered = element;
278    
279            // deliver the sequence element
280            sequenceSender.sendElement(element.getObject());
281    
282            // element has been delivered
283            return true;
284        }
285    
286        /**
287         * Returns <code>true</code> if the given element is the immediate
288         * successor of the last delivered element.
289         *
290         * @param element an element.
291         * @return <code>true</code> if the given element is the immediate
292         *         successor of the last delivered element.
293         */
294        private boolean successorOfLastDelivered(Element<E> element) {
295            if (lastDelivered == null) {
296                return false;
297            }
298            if (sequence.comparator().successor(element, lastDelivered)) {
299                return true;
300            }
301            return false;
302        }
303    
304        /**
305         * Retuns <code>true</code> if the given element is before the last delivered element.
306         *
307         * @param element an element.
308         * @return <code>true</code> if the given element is before the last delivered element.
309         */
310        private boolean beforeLastDelivered(Element<E> element) {
311            if (lastDelivered == null) {
312                return false;
313            }
314            if (sequence.comparator().compare(element, lastDelivered) < 0) {
315                return true;
316            }
317            return false;
318        }
319    
320        /**
321         * Creates a timeout task based on the timeout setting of this resequencer.
322         *
323         * @return a new timeout task.
324         */
325        private Timeout defineTimeout() {
326            return new Timeout(timer, timeout);
327        }
328    
329        private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
330            return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
331        }
332    
333    }