Coverage report

  %line %branch
org.apache.jetspeed.aggregator.impl.WorkerMonitorImpl$RenderingJobTimeoutMonitor
0% 
0% 

 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  * 
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  * 
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 
 18  
 package org.apache.jetspeed.aggregator.impl;
 19  
 
 20  
 import java.security.AccessControlContext;
 21  
 import java.security.AccessController;
 22  
 import java.util.Iterator;
 23  
 import java.util.ArrayList;
 24  
 import java.util.List;
 25  
 import java.util.Stack;
 26  
 import java.util.LinkedList;
 27  
 import java.util.Collections;
 28  
 
 29  
 import org.apache.commons.logging.Log;
 30  
 import org.apache.commons.logging.LogFactory;
 31  
 import org.apache.jetspeed.aggregator.RenderingJob;
 32  
 import org.apache.jetspeed.aggregator.Worker;
 33  
 import org.apache.jetspeed.aggregator.WorkerMonitor;
 34  
 import org.apache.jetspeed.aggregator.PortletContent;
 35  
 import org.apache.jetspeed.util.Queue;
 36  
 import org.apache.jetspeed.util.FIFOQueue;
 37  
 
 38  
 import org.apache.pluto.om.window.PortletWindow;
 39  
 import org.apache.pluto.om.common.ObjectID;
 40  
 
 41  
 /**
 42  
  * The WorkerMonitor is responsible for dispatching jobs to workers
 43  
  * It uses an Apache HTTPd configuration style of min/max/spare workers
 44  
  * threads to throttle the rendering work.
 45  
  * If jobs come in faster that processing, they are stored in a queue
 46  
  * which is flushed periodically by a QueueMonitor.
 47  
  *
 48  
  * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a>
 49  
  * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
 50  
  * @version $Id: WorkerMonitorImpl.java 591867 2007-11-05 02:20:06Z woonsan $
 51  
  */
 52  
 public class WorkerMonitorImpl implements WorkerMonitor
 53  
 {
 54  
     public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
 55  
 
 56  
     public WorkerMonitorImpl(int minWorkers, class="keyword">int maxWorkers, class="keyword">int spareWorkers, class="keyword">int maxJobsPerWorker)
 57  
     {
 58  
         this.minWorkers = minWorkers;
 59  
         this.maxWorkers = maxWorkers;
 60  
         this.spareWorkers = spareWorkers;
 61  
         this.maxJobsPerWorker = maxJobsPerWorker;
 62  
     }
 63  
     
 64  
     /** Commons logging */
 65  
     protected final static Log log = LogFactory.getLog(WorkerMonitorImpl.class);
 66  
 
 67  
     /** Static counters for identifying workers */
 68  
     protected static long sCount = 0;
 69  
 
 70  
     /** Count of running jobs **/
 71  
     protected int runningJobs = 0;
 72  
     
 73  
     /** Minimum number of wokers to create */
 74  
     protected int minWorkers = 5;
 75  
 
 76  
     /** Maximum number of workers */
 77  
     protected int maxWorkers = 50;
 78  
 
 79  
     /** Minimum amount of spare workers */
 80  
     protected int spareWorkers = 3;
 81  
 
 82  
     /** Maximum of job processed by a worker before being released */
 83  
     protected int maxJobsPerWorker = 10;
 84  
 
 85  
     /** Stack containing currently idle workers */
 86  
     protected Stack workers = new Stack();
 87  
 
 88  
     /** The thread group used to group all worker threads */
 89  
     protected ThreadGroup tg = new ThreadGroup("Workers");
 90  
 
 91  
     /** Job queue */
 92  
     protected Queue queue;
 93  
 
 94  
     /** Workers to be monitored for timeout checking */
 95  
     protected List workersMonitored = Collections.synchronizedList(new LinkedList());
 96  
 
 97  
     /** Renering Job Timeout monitor */
 98  
     protected RenderingJobTimeoutMonitor jobMonitor = null;
 99  
 
 100  
     public void start()
 101  
     {
 102  
         addWorkers(this.minWorkers);
 103  
         this.queue = new FIFOQueue();
 104  
 
 105  
         jobMonitor = new RenderingJobTimeoutMonitor(1000);
 106  
         jobMonitor.start();
 107  
     }
 108  
 
 109  
     public void stop()
 110  
     {    
 111  
     	if (jobMonitor != null)
 112  
     		jobMonitor.endThread();
 113  
     	jobMonitor = null;
 114  
     	
 115  
     }
 116  
 
 117  
     /**
 118  
      * Create the request number of workers and add them to
 119  
      * list of available workers.
 120  
      *
 121  
      * @param wCount the number of workers to create
 122  
      */
 123  
     protected synchronized void addWorkers(int wCount)
 124  
     {
 125  
         int wCurrent = this.tg.activeCount();
 126  
 
 127  
         if (wCurrent < maxWorkers)
 128  
         {
 129  
             if (wCurrent + wCount > maxWorkers)
 130  
             {
 131  
                 wCount = maxWorkers - wCurrent;
 132  
             }
 133  
 
 134  
             log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
 135  
 
 136  
             for (int i = 0; i < wCount; ++i)
 137  
             {
 138  
                 Worker worker = new WorkerImpl(this, class="keyword">this.tg, "WORKER_" + (++sCount));
 139  
                 worker.start();
 140  
                 workers.push(worker);
 141  
             }
 142  
         }
 143  
     }
 144  
 
 145  
     /**
 146  
      * Retrieves an idle worker
 147  
      *
 148  
      * @return a Worker from the idle pool or null if non available
 149  
      */
 150  
     protected Worker getWorker()
 151  
     {
 152  
         synchronized(this.workers)
 153  
         {
 154  
             if (this.workers.size() < spareWorkers)
 155  
             {
 156  
                 addWorkers(spareWorkers);
 157  
             }
 158  
 
 159  
             if (this.workers.size() == 0)
 160  
             {
 161  
                 return null;
 162  
             }
 163  
 
 164  
             return (Worker)workers.pop();
 165  
         }
 166  
     }
 167  
 
 168  
     /**
 169  
      * Assign a job to a worker and execute it or queue the job if no
 170  
      * worker is available.
 171  
      *
 172  
      * @param job the Job to process
 173  
      */
 174  
     public void process(RenderingJob job)
 175  
     {
 176  
         Worker worker = this.getWorker();
 177  
 
 178  
         AccessControlContext context = AccessController.getContext();
 179  
         job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context);
 180  
         
 181  
         if (worker==null)
 182  
         {
 183  
             queue.push(job);
 184  
         }
 185  
         else
 186  
         {
 187  
             try
 188  
             {
 189  
                 synchronized (worker)
 190  
                 {
 191  
                     worker.setJob(job, context);
 192  
 
 193  
                     if (job.getTimeout() > 0) {
 194  
                         workersMonitored.add(worker);
 195  
                     }
 196  
 
 197  
                     worker.notify();
 198  
                     runningJobs++;
 199  
                 }
 200  
             }
 201  
             catch (Throwable t)
 202  
             {
 203  
                 log.error("Worker exception", t);
 204  
             }
 205  
         }
 206  
     }
 207  
     
 208  
     /**
 209  
      * Wait for all rendering jobs in the collection to finish successfully or otherwise. 
 210  
      * @param renderingJobs the Collection of rendering job objects to wait for.
 211  
      */
 212  
     public void waitForRenderingJobs(List renderingJobs)
 213  
     {
 214  
         try 
 215  
         {
 216  
             for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
 217  
             {
 218  
                 RenderingJob job = (RenderingJob) iter.next();
 219  
                 PortletContent portletContent = job.getPortletContent();
 220  
                 
 221  
                 synchronized (portletContent) 
 222  
                 {
 223  
                     if (!portletContent.isComplete()) 
 224  
                     {
 225  
                         portletContent.wait();
 226  
                     }
 227  
                 }
 228  
             }
 229  
         }
 230  
         catch (Exception e)
 231  
         {
 232  
             log.error("Exception during synchronizing all portlet rendering jobs.", e);
 233  
         }
 234  
     }
 235  
 
 236  
     /**
 237  
      * Put back the worker in the idle queue unless there are pending jobs and
 238  
      * worker can still be committed to a new job before being released.
 239  
      */
 240  
     protected void release(Worker worker)
 241  
     {
 242  
         // if worker can still proces some jobs assign the first
 243  
         // backlog job to this worker, else reset job count and put
 244  
         // it on the idle queue.
 245  
 
 246  
         long jobTimeout = 0;
 247  
 
 248  
         RenderingJob oldJob = (RenderingJob) worker.getJob();
 249  
         if (oldJob != null)
 250  
         {
 251  
             jobTimeout = oldJob.getTimeout();
 252  
         }
 253  
 
 254  
         synchronized (worker)
 255  
         {
 256  
             RenderingJob job = null;
 257  
             
 258  
             if (worker.getJobCount() < this.maxJobsPerWorker)
 259  
             {
 260  
                 job = (RenderingJob) queue.pop();
 261  
                 
 262  
                 if (job != null)
 263  
                 {
 264  
                     AccessControlContext context = (AccessControlContext) job.getWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR);
 265  
                     worker.setJob(job, context);
 266  
                     runningJobs--;
 267  
                     return;
 268  
                 }
 269  
             }
 270  
             
 271  
             if (job == null)
 272  
             {
 273  
                 worker.setJob(null);
 274  
                 worker.resetJobCount();
 275  
                 runningJobs--;
 276  
             }
 277  
         }
 278  
 
 279  
         if (jobTimeout > 0) {
 280  
             workersMonitored.remove(worker);
 281  
         }
 282  
 
 283  
         synchronized (this.workers)
 284  
         {
 285  
             this.workers.push(worker);
 286  
         }
 287  
     }
 288  
 
 289  
     public int getQueuedJobsCount()
 290  
     {
 291  
         return queue.size();
 292  
     }
 293  
     
 294  
     /**
 295  
      * Returns a snapshot of the available jobs
 296  
      * @return available jobs
 297  
      */
 298  
     public int getAvailableJobsCount()
 299  
     {
 300  
         return workers.size();
 301  
     }
 302  
     
 303  
     public int getRunningJobsCount()
 304  
     {
 305  
         return this.tg.activeCount();
 306  
     }
 307  
     
 308  
     class RenderingJobTimeoutMonitor extends Thread {
 309  
 
 310  0
         long interval = 1000;
 311  0
         boolean shouldRun = true;
 312  
         
 313  0
         RenderingJobTimeoutMonitor(long interval) {
 314  0
             super("RenderingJobTimeoutMonitor");
 315  
 
 316  0
             if (interval > 0) {
 317  0
                 this.interval = interval;
 318  
             }
 319  0
         }
 320  
         /**
 321  
          * Thread.stop() is deprecated.
 322  
          * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, 
 323  
          * effectively causing the thread to shutdown correctly.
 324  
          *
 325  
          */
 326  
         public void endThread()
 327  
         {
 328  0
         	shouldRun = false;
 329  0
         	this.interrupt();
 330  0
         }
 331  
         
 332  
         public void run() {
 333  0
             while (shouldRun) {
 334  
                 try 
 335  
                 {
 336  
                     // Because a timeout worker can be removed 
 337  
                     // in the workersMonitored collection during iterating,
 338  
                     // copy timeout workers in the following collection to kill later.
 339  
 
 340  0
                     List timeoutWorkers = new ArrayList();
 341  
 
 342  0
                     synchronized (workersMonitored) 
 343  
                     {
 344  0
                         for (Iterator it = workersMonitored.iterator(); it.hasNext(); )
 345  
                         {
 346  0
                             WorkerImpl worker = (WorkerImpl) it.next();
 347  0
                             RenderingJob job = (RenderingJob) worker.getJob();
 348  
                             
 349  0
                             if ((null != job) && (job.isTimeout()))
 350  
                             {
 351  0
                                 timeoutWorkers.add(worker);
 352  
                             }
 353  0
                         }
 354  0
                     }
 355  
 
 356  
                     // Now, we can kill the timeout worker(s).
 357  0
                     for (Iterator it = timeoutWorkers.iterator(); it.hasNext(); )
 358  
                     {
 359  0
                         WorkerImpl worker = (WorkerImpl) it.next();
 360  0
                         RenderingJob job = (RenderingJob) worker.getJob();
 361  
 
 362  
                         // If the job is just completed, then do not kill the worker.
 363  0
                         if ((null != job) && (job.isTimeout()))
 364  
                         {
 365  0
                             killJob(worker, job);
 366  
                         }
 367  0
                     }
 368  
                 } 
 369  0
                 catch (Exception e) 
 370  
                 {
 371  0
                     log.error("Exception during job monitoring.", e);
 372  0
                 }
 373  
                
 374  
                 try 
 375  
                 {
 376  0
                     synchronized (this) 
 377  
                     {
 378  0
                         wait(this.interval);
 379  0
                     }
 380  
                 } 
 381  0
                 catch (InterruptedException e) 
 382  
                 {
 383  
                     ;
 384  0
                 }
 385  
             }
 386  0
         }
 387  
 
 388  
         public void killJob(WorkerImpl worker, RenderingJob job) {
 389  
             try {
 390  0
                 if (log.isWarnEnabled()) {
 391  0
                     PortletWindow window = job.getWindow();
 392  0
                     ObjectID windowId = (null != window ? window.getId() : class="keyword">null);
 393  0
                     log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
 394  
                 }
 395  
 
 396  0
                 PortletContent content = job.getPortletContent();
 397  
                 
 398  0
                 synchronized (content)
 399  
                 {
 400  0
                     if (!content.isComplete()) {
 401  0
                         worker.interrupt();
 402  0
                         content.wait();
 403  
                     }
 404  0
                 }
 405  
                 
 406  0
             } catch (Exception e) {
 407  0
                 log.error("Exceptiong during job killing.", e);
 408  0
             }
 409  0
         }
 410  
 
 411  
     }
 412  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.