EMMA Coverage Report (generated Sun Sep 18 11:34:27 PHT 2011)
[all classes][org.apache.continuum.taskqueueexecutor]

COVERAGE SUMMARY FOR SOURCE FILE [ParallelBuildsThreadedTaskQueueExecutor.java]

nameclass, %method, %block, %line, %
ParallelBuildsThreadedTaskQueueExecutor.java75%  (3/4)63%  (17/27)39%  (188/481)36%  (39.8/112)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ParallelBuildsThreadedTaskQueueExecutor100% (1/1)65%  (11/17)40%  (65/164)38%  (12.8/34)
access$000 (ParallelBuildsThreadedTaskQueueExecutor): Task 0%   (0/1)0%   (0/3)0%   (0/1)
access$500 (ParallelBuildsThreadedTaskQueueExecutor): String 0%   (0/1)0%   (0/3)0%   (0/1)
cancelTask (Task): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
getName (): String 0%   (0/1)0%   (0/3)0%   (0/1)
setName (String): void 0%   (0/1)0%   (0/4)0%   (0/2)
stop (): void 0%   (0/1)0%   (0/76)0%   (0/16)
initialize (): void 100% (1/1)50%  (5/10)67%  (2/3)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
ParallelBuildsThreadedTaskQueueExecutor (): void 100% (1/1)100% (3/3)100% (2/2)
access$002 (ParallelBuildsThreadedTaskQueueExecutor, Task): Task 100% (1/1)100% (5/5)100% (1/1)
access$100 (ParallelBuildsThreadedTaskQueueExecutor): TaskQueue 100% (1/1)100% (3/3)100% (1/1)
access$200 (): Logger 100% (1/1)100% (2/2)100% (1/1)
access$300 (ParallelBuildsThreadedTaskQueueExecutor): TaskExecutor 100% (1/1)100% (3/3)100% (1/1)
access$400 (ParallelBuildsThreadedTaskQueueExecutor): ExecutorService 100% (1/1)100% (3/3)100% (1/1)
getCurrentTask (): Task 100% (1/1)100% (3/3)100% (1/1)
getQueue (): TaskQueue 100% (1/1)100% (3/3)100% (1/1)
start (): void 100% (1/1)100% (31/31)100% (6/6)
     
class ParallelBuildsThreadedTaskQueueExecutor$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class ParallelBuildsThreadedTaskQueueExecutor$ExecutorRunnable100% (1/1)50%  (4/8)34%  (100/294)30%  (22/73)
cancel (Future): void 0%   (0/1)0%   (0/37)0%   (0/6)
cancelTask (Task): boolean 0%   (0/1)0%   (0/29)0%   (0/9)
isDone (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
shutdown (): void 0%   (0/1)0%   (0/9)0%   (0/4)
run (): void 100% (1/1)38%  (41/107)48%  (11/23)
waitForTask (Task, Future): void 100% (1/1)49%  (49/99)34%  (10/29)
ParallelBuildsThreadedTaskQueueExecutor$ExecutorRunnable (ParallelBuildsThrea... 100% (1/1)100% (6/6)100% (1/1)
ParallelBuildsThreadedTaskQueueExecutor$ExecutorRunnable (ParallelBuildsThrea... 100% (1/1)100% (4/4)100% (1/1)
     
class ParallelBuildsThreadedTaskQueueExecutor$ExecutorRunnable$1100% (1/1)100% (2/2)100% (23/23)100% (6/6)
ParallelBuildsThreadedTaskQueueExecutor$ExecutorRunnable$1 (ParallelBuildsThr... 100% (1/1)100% (9/9)100% (1/1)
run (): void 100% (1/1)100% (14/14)100% (5/5)

1package org.apache.continuum.taskqueueexecutor;
2 
3/*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements.  See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership.  The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License.  You may obtain a copy of the License at
11 *
12 *   http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied.  See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21 
22import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
23import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
24import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
25import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
26import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
27import org.codehaus.plexus.taskqueue.Task;
28import org.codehaus.plexus.taskqueue.TaskQueue;
29import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
30import org.codehaus.plexus.taskqueue.execution.TaskExecutor;
31import org.codehaus.plexus.taskqueue.execution.TaskQueueExecutor;
32import org.codehaus.plexus.util.StringUtils;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35 
36import edu.emory.mathcs.backport.java.util.concurrent.CancellationException;
37import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
38import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
39import edu.emory.mathcs.backport.java.util.concurrent.Executors;
40import edu.emory.mathcs.backport.java.util.concurrent.Future;
41import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
42import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
43 
44/**
45 * Modified plexus ThreadedTaskQueueExecutor
46 */
47public class ParallelBuildsThreadedTaskQueueExecutor
48    implements TaskQueueExecutor, Initializable, Startable
49{
50    private static final Logger log = LoggerFactory.getLogger( ParallelBuildsThreadedTaskQueueExecutor.class );
51 
52    private static final int SHUTDOWN = 1;
53 
54    private static final int CANCEL_TASK = 2;
55 
56    /**
57     * @requirement
58     */
59    private TaskQueue queue;
60 
61    /**
62     * @requirement
63     */
64    private TaskExecutor executor;
65 
66    /**
67     * @configuration
68     */
69    private String name;
70 
71    // ----------------------------------------------------------------------
72    //
73    // ----------------------------------------------------------------------
74 
75    private ExecutorRunnable executorRunnable;
76 
77    private ExecutorService executorService;
78 
79    private Task currentTask;
80 
81    private class ExecutorRunnable
82        extends Thread
83    {
84        private volatile int command;
85 
86        private boolean done;
87 
88        public void run()
89        {
90            while ( command != SHUTDOWN )
91            {
92                final Task task;
93 
94                currentTask = null;
95 
96                try
97                {
98                    task = queue.poll( 100, TimeUnit.MILLISECONDS );
99                }
100                catch ( InterruptedException e )
101                {
102                    log.info( "Executor thread interrupted, command: " +
103                        ( command == SHUTDOWN ? "Shutdown" : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
104                    continue;
105                }
106 
107                if ( task == null )
108                {
109                    continue;
110                }
111 
112                currentTask = task;
113 
114                Future future = executorService.submit( new Runnable()
115                {
116                    public void run()
117                    {
118                        try
119                        {
120                            executor.executeTask( task );
121                        }
122                        catch ( TaskExecutionException e )
123                        {
124                            log.error( "Error executing task", e );
125                        }
126                    }
127                } );
128 
129                try
130                {
131                    waitForTask( task, future );
132                }
133                catch ( ExecutionException e )
134                {
135                    log.error( "Error executing task", e );
136                }
137            }
138 
139            currentTask = null;
140 
141            log.info( "Executor thread '" + name + "' exited." );
142 
143            done = true;
144 
145            synchronized ( this )
146            {
147                notifyAll();
148            }
149        }
150 
151        private void waitForTask( Task task, Future future )
152            throws ExecutionException
153        {
154            boolean stop = false;
155 
156            while ( !stop )
157            {
158                try
159                {
160                    if ( task.getMaxExecutionTime() == 0 )
161                    {
162                        log.debug( "Waiting indefinitely for task to complete" );
163                        future.get();
164                        return;
165                    }
166                    else
167                    {
168                        log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" );
169                        future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS );
170                        log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" );
171                        return;
172                    }
173                }
174                catch ( InterruptedException e )
175                {
176                    switch ( command )
177                    {
178                        case SHUTDOWN:
179                        {
180                            log.info( "Shutdown command received. Cancelling task." );
181                            cancel( future );
182                            return;
183                        }
184 
185                        case CANCEL_TASK:
186                        {
187                            command = 0;
188                            log.info( "Cancelling task" );
189                            cancel( future );
190                            return;
191                        }
192 
193                        default:
194                            // when can this thread be interrupted, and should we ignore it if shutdown = false?
195                            log.warn( "Interrupted while waiting for task to complete; ignoring", e );
196                            break;
197                    }
198                }
199                catch ( TimeoutException e )
200                {
201                    log.warn( "Task " + task + " didn't complete within time, cancelling it." );
202                    cancel( future );
203                    return;
204                }
205                catch ( CancellationException e )
206                {
207                    log.warn( "The task was cancelled", e );
208                    return;
209                }
210            }
211        }
212 
213        private void cancel( Future future )
214        {
215            if ( !future.cancel( true ) )
216            {
217                if ( !future.isDone() && !future.isCancelled() )
218                {
219                    log.warn( "Unable to cancel task" );
220                }
221                else
222                {
223                    log.warn(
224                        "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() +
225                            ")" );
226                }
227            }
228            else
229            {
230                log.debug( "Task successfully cancelled" );
231            }
232        }
233 
234        public synchronized void shutdown()
235        {
236            log.debug( "Signalling executor thread to shutdown" );
237 
238            command = SHUTDOWN;
239 
240            interrupt();
241        }
242 
243        public synchronized boolean cancelTask( Task task )
244        {
245            if ( !task.equals( currentTask ) )
246            {
247                log.debug( "Not cancelling task - it is not running" );
248                return false;
249            }
250 
251            if ( command != SHUTDOWN )
252            {
253                log.debug( "Signalling executor thread to cancel task" );
254 
255                command = CANCEL_TASK;
256 
257                interrupt();
258            }
259            else
260            {
261                log.debug( "Executor thread already stopping; task will be cancelled automatically" );
262            }
263 
264            return true;
265        }
266 
267        public boolean isDone()
268        {
269            return done;
270        }
271    }
272 
273    // ----------------------------------------------------------------------
274    // Component lifecycle
275    // ----------------------------------------------------------------------
276 
277    public void initialize()
278        throws InitializationException
279    {
280        if ( StringUtils.isEmpty( name ) )
281        {
282            throw new IllegalArgumentException( "'name' must be set." );
283        }
284    }
285 
286    public void start()
287        throws StartingException
288    {
289        log.info( "Starting task executor, thread name '" + name + "'." );
290 
291        this.executorService = Executors.newCachedThreadPool();
292 
293        executorRunnable = new ExecutorRunnable();
294 
295        executorRunnable.setDaemon( true );
296 
297        executorRunnable.start();
298    }
299 
300    public void stop()
301        throws StoppingException
302    {
303        executorRunnable.shutdown();
304 
305        int maxSleep = 10 * 1000; // 10 seconds
306 
307        int interval = 1000;
308 
309        long endTime = System.currentTimeMillis() + maxSleep;
310 
311        while ( !executorRunnable.isDone() && executorRunnable.isAlive() )
312        {
313            if ( System.currentTimeMillis() > endTime )
314            {
315                log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" );
316                break;
317            }
318 
319            log.info( "Waiting until task executor '" + name + "' is idling..." );
320 
321            try
322            {
323                synchronized ( executorRunnable )
324                {
325                    executorRunnable.wait( interval );
326                }
327            }
328            catch ( InterruptedException ex )
329            {
330                // ignore
331            }
332 
333            // notify again, just in case.
334            executorRunnable.shutdown();
335        }
336    }
337 
338    public Task getCurrentTask()
339    {
340        return currentTask;
341    }
342 
343    public synchronized boolean cancelTask( Task task )
344    {
345        return executorRunnable.cancelTask( task );
346    }
347 
348    public String getName()
349    {
350        return name;
351    }
352 
353    public void setName( String name )
354    {
355        this.name = name;
356    }
357 
358    public TaskQueue getQueue()
359    {
360        return queue;
361    }
362}

[all classes][org.apache.continuum.taskqueueexecutor]
EMMA 2.0.5312 (C) Vladimir Roubtsov