001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util.concurrent;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.CompletionService;
021    import java.util.concurrent.DelayQueue;
022    import java.util.concurrent.Delayed;
023    import java.util.concurrent.Executor;
024    import java.util.concurrent.Future;
025    import java.util.concurrent.FutureTask;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicInteger;
028    
029    /**
030     * A {@link java.util.concurrent.CompletionService} that orders the completed tasks
031     * in the same order as they where submitted.
032     *
033     * @version 
034     */
035    public class SubmitOrderedCompletionService<V> implements CompletionService<V> {
036        
037        private final Executor executor;
038    
039        // the idea to order the completed task in the same order as they where submitted is to leverage
040        // the delay queue. With the delay queue we can control the order by the getDelay and compareTo methods
041        // where we can order the tasks in the same order as they where submitted.
042        private final DelayQueue<SubmitOrderFutureTask> completionQueue = new DelayQueue<SubmitOrderFutureTask>();
043    
044        // id is the unique id that determines the order in which tasks was submitted (incrementing)
045        private final AtomicInteger id = new AtomicInteger();
046        // index is the index of the next id that should expire and thus be ready to take from the delayed queue
047        private final AtomicInteger index = new AtomicInteger();
048    
049        private class SubmitOrderFutureTask extends FutureTask<V> implements Delayed {
050    
051            // the id this task was assigned
052            private final long id;
053    
054            public SubmitOrderFutureTask(long id, Callable<V> voidCallable) {
055                super(voidCallable);
056                this.id = id;
057            }
058    
059            public SubmitOrderFutureTask(long id, Runnable runnable, V result) {
060                super(runnable, result);
061                this.id = id;
062            }
063    
064            public long getDelay(TimeUnit unit) {
065                // if the answer is 0 then this task is ready to be taken
066                return id - index.get();
067            }
068    
069            @SuppressWarnings("unchecked")
070            public int compareTo(Delayed o) {
071                SubmitOrderFutureTask other = (SubmitOrderFutureTask) o;
072                return (int) (this.id - other.id);
073            }
074    
075            @Override
076            protected void done() {
077                // when we are done add to the completion queue
078                completionQueue.add(this);
079            }
080    
081            @Override
082            public String toString() {
083                // output using zero-based index
084                return "SubmitOrderedFutureTask[" + (id - 1) + "]";
085            }
086        }
087    
088        public SubmitOrderedCompletionService(Executor executor) {
089            this.executor = executor;
090        }
091    
092        public Future<V> submit(Callable<V> task) {
093            if (task == null) {
094                throw new IllegalArgumentException("Task must be provided");
095            }
096            SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task);
097            executor.execute(f);
098            return f;
099        }
100    
101        public Future<V> submit(Runnable task, Object result) {
102            if (task == null) {
103                throw new IllegalArgumentException("Task must be provided");
104            }
105            SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task, null);
106            executor.execute(f);
107            return f;
108        }
109    
110        public Future<V> take() throws InterruptedException {
111            index.incrementAndGet();
112            return completionQueue.take();
113        }
114    
115        public Future<V> poll() {
116            index.incrementAndGet();
117            Future<V> answer = completionQueue.poll();
118            if (answer == null) {
119                // decrease counter if we didnt get any data
120                index.decrementAndGet();
121            }
122            return answer;
123        }
124    
125        public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
126            index.incrementAndGet();
127            Future<V> answer = completionQueue.poll(timeout, unit);
128            if (answer == null) {
129                // decrease counter if we didnt get any data
130                index.decrementAndGet();
131            }
132            return answer;
133        }
134    
135        /**
136         * Marks the current task as timeout, which allows you to poll the next
137         * tasks which may already have been completed.
138         */
139        public void timeoutTask() {
140            index.incrementAndGet();
141        }
142    
143    }