Coverage report

  %line %branch
org.apache.jetspeed.aggregator.impl.CommonjWorkerMonitorImpl$MonitoringJobCommonjWork
0% 
0% 

 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  * 
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  * 
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 
 18  
 package org.apache.jetspeed.aggregator.impl;
 19  
 
 20  
 import java.security.AccessControlContext;
 21  
 import java.security.AccessController;
 22  
 import java.util.List;
 23  
 import java.util.ArrayList;
 24  
 import java.util.Iterator;
 25  
 import java.util.Collection;
 26  
 import java.util.Collections;
 27  
 import java.util.Map;
 28  
 import java.util.HashMap;
 29  
 import java.util.Arrays;
 30  
 
 31  
 import org.apache.commons.logging.Log;
 32  
 import org.apache.commons.logging.LogFactory;
 33  
 import org.apache.jetspeed.aggregator.RenderingJob;
 34  
 import org.apache.jetspeed.aggregator.Worker;
 35  
 import org.apache.jetspeed.aggregator.WorkerMonitor;
 36  
 import org.apache.jetspeed.aggregator.PortletContent;
 37  
 
 38  
 import org.apache.pluto.om.window.PortletWindow;
 39  
 import org.apache.pluto.om.common.ObjectID;
 40  
 
 41  
 import commonj.work.WorkManager;
 42  
 import commonj.work.Work;
 43  
 import commonj.work.WorkItem;
 44  
 import commonj.work.WorkListener;
 45  
 import commonj.work.WorkEvent;
 46  
 
 47  
 /**
 48  
  * The CommonjWorkerMonitorImpl is responsible for dispatching jobs to workers
 49  
  * It wraps CommonJ WorkManager supported by IBM WebSphere and BEA WebLogic sever.
 50  
  *
 51  
  * @author <a href="mailto:woon_san@apache.org">Woonsan Ko</a>
 52  
  * @version $Id: CommonjWorkerMonitorImpl.java 568339 2007-08-22 00:14:51Z ate $
 53  
  */
 54  
 public class CommonjWorkerMonitorImpl implements WorkerMonitor, WorkListener
 55  
 {
 56  
 
 57  
     public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
 58  
     public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class.getName();
 59  
     public static final String WORKER_THREAD_ATTR = Worker.class.getName();
 60  
     
 61  
     /** CommonJ Work Manamger provided by JavaEE container */
 62  
     protected WorkManager workManager;
 63  
 
 64  
     /** If true, invoke interrupt() on the worker thread when the job is timeout. */
 65  
     protected boolean interruptOnTimeout = true;
 66  
     
 67  
     /** Enable rendering job works monitor thread for timeout checking */
 68  
     protected boolean jobWorksMonitorEnabled = true;
 69  
     
 70  
     /** Rendering job works to be monitored for timeout checking */
 71  
     protected Map jobWorksMonitored = Collections.synchronizedMap(new HashMap());
 72  
     
 73  
     public CommonjWorkerMonitorImpl(WorkManager workManager)
 74  
     {
 75  
         this(workManager, true);
 76  
     }
 77  
     
 78  
     public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled)
 79  
     {
 80  
         this(workManager, jobWorksMonitorEnabled, true);
 81  
     }
 82  
     
 83  
     public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled, class="keyword">boolean interruptOnTimeout)
 84  
     {
 85  
         this.workManager = workManager;
 86  
         this.jobWorksMonitorEnabled = jobWorksMonitorEnabled;
 87  
         this.interruptOnTimeout = interruptOnTimeout;
 88  
     }
 89  
     
 90  
     /** Commons logging */
 91  
     protected final static Log log = LogFactory.getLog(CommonjWorkerMonitorImpl.class);
 92  
     
 93  
     /** Renering Job Timeout monitor */
 94  
     protected CommonjWorkerRenderingJobTimeoutMonitor jobMonitor = null;
 95  
     
 96  
     public void start()
 97  
     {
 98  
         if (this.jobWorksMonitorEnabled)
 99  
         {
 100  
             jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
 101  
             jobMonitor.start();
 102  
         }
 103  
     }
 104  
 
 105  
     public void stop()
 106  
     {
 107  
         if (jobMonitor != null)
 108  
         {
 109  
             jobMonitor.endThread();
 110  
         }
 111  
 
 112  
         jobMonitor = null;
 113  
     }
 114  
     
 115  
     /**
 116  
      * Assign a job to a worker and execute it or queue the job if no
 117  
      * worker is available.
 118  
      *
 119  
      * @param job the Job to process
 120  
      */
 121  
     public void process(RenderingJob job)
 122  
     {
 123  
         AccessControlContext context = AccessController.getContext();
 124  
         job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context);
 125  
         
 126  
         try
 127  
         {
 128  
             RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(job);
 129  
             WorkItem workItem = this.workManager.schedule(jobWork, class="keyword">this);
 130  
             job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
 131  
             
 132  
             if (this.jobWorksMonitorEnabled)
 133  
             {
 134  
                 this.jobWorksMonitored.put(workItem, jobWork);
 135  
             }
 136  
         }
 137  
         catch (Throwable t)
 138  
         {
 139  
             log.error("Worker exception", t);
 140  
         }
 141  
     }
 142  
 
 143  
     public int getQueuedJobsCount()
 144  
     {
 145  
         return 0;
 146  
     }
 147  
     
 148  
     /**
 149  
      * Wait for all rendering jobs in the collection to finish successfully or otherwise. 
 150  
      * @param renderingJobs the Collection of rendering job objects to wait for.
 151  
      */
 152  
     public void waitForRenderingJobs(List renderingJobs)
 153  
     {
 154  
         if (this.jobWorksMonitorEnabled)
 155  
         {
 156  
             try 
 157  
             {
 158  
                 for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
 159  
                 {
 160  
                     RenderingJob job = (RenderingJob) iter.next();
 161  
                     PortletContent portletContent = job.getPortletContent();
 162  
                     
 163  
                     synchronized (portletContent) 
 164  
                     {
 165  
                         if (!portletContent.isComplete()) 
 166  
                         {
 167  
                             portletContent.wait();
 168  
                         }
 169  
                     }
 170  
                 }
 171  
             }
 172  
             catch (Exception e)
 173  
             {
 174  
                 log.error("Exception during synchronizing all portlet rendering jobs.", e);
 175  
             }
 176  
         }
 177  
         else
 178  
         {
 179  
             // We cannot use WorkingManager#waitForAll(workitems, timeout_ms) for timeout.
 180  
             // The second argument could be either WorkManager.IMMEDIATE or WorkManager.INDEFINITE.
 181  
             
 182  
             try
 183  
             {
 184  
                 if (!renderingJobs.isEmpty())
 185  
                 {
 186  
                     Object lock = new Object();
 187  
                     MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(lock, renderingJobs);
 188  
                     
 189  
                     synchronized (lock)
 190  
                     {
 191  
                         WorkItem monitorWorkItem = this.workManager.schedule(monitoringWork, class="keyword">this);
 192  
                         lock.wait();
 193  
                     }
 194  
                 }
 195  
             }
 196  
             catch (Exception e)
 197  
             {
 198  
                 log.error("Exception during synchronizing all portlet rendering jobs.", e);
 199  
             }
 200  
         }
 201  
     }
 202  
     
 203  
     /**
 204  
      * Returns a snapshot of the available jobs
 205  
      * @return available jobs
 206  
      */
 207  
     public int getAvailableJobsCount()
 208  
     {
 209  
         return 0;
 210  
     }
 211  
     
 212  
     public int getRunningJobsCount()
 213  
     {
 214  
         return 0;
 215  
     }
 216  
     
 217  
     // commonj.work.WorkListener implementations
 218  
     
 219  
     public void workAccepted(WorkEvent we)
 220  
     {
 221  
         WorkItem workItem = we.getWorkItem();
 222  
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workAccepted: " + workItem);
 223  
     }
 224  
 
 225  
     public void workRejected(WorkEvent we)
 226  
     {
 227  
         WorkItem workItem = we.getWorkItem();
 228  
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workRejected: " + workItem);
 229  
         
 230  
         if (this.jobWorksMonitorEnabled)
 231  
         {
 232  
             removeMonitoredJobWork(workItem);
 233  
         }
 234  
     }
 235  
 
 236  
     public void workStarted(WorkEvent we)
 237  
     {
 238  
         WorkItem workItem = we.getWorkItem();
 239  
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workStarted: " + workItem);
 240  
     }
 241  
 
 242  
     public void workCompleted(WorkEvent we)
 243  
     {
 244  
         WorkItem workItem = we.getWorkItem();
 245  
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workCompleted: " + workItem);
 246  
         
 247  
         if (this.jobWorksMonitorEnabled)
 248  
         {
 249  
             removeMonitoredJobWork(workItem);
 250  
         }
 251  
     }
 252  
     
 253  
     protected Object removeMonitoredJobWork(WorkItem workItem)
 254  
     {
 255  
         return this.jobWorksMonitored.remove(workItem);
 256  
     }
 257  
     
 258  
     class RenderingJobCommonjWork implements Work
 259  
     {
 260  
 
 261  
         protected RenderingJob job;
 262  
 
 263  
         public RenderingJobCommonjWork(RenderingJob job)
 264  
         {
 265  
             this.job = job;
 266  
         }
 267  
 
 268  
         public boolean isDaemon()
 269  
         {
 270  
             return false;
 271  
         }
 272  
         
 273  
         public void run()
 274  
         {
 275  
             if (jobWorksMonitorEnabled || interruptOnTimeout)
 276  
             {
 277  
                 this.job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread.currentThread());
 278  
             }
 279  
             
 280  
             this.job.run();
 281  
         }
 282  
         
 283  
         public void release()
 284  
         {
 285  
         }
 286  
         
 287  
         public RenderingJob getRenderingJob()
 288  
         {
 289  
             return this.job;
 290  
         }
 291  
     }
 292  
 
 293  
     class MonitoringJobCommonjWork implements Work
 294  
     {
 295  
         
 296  
         protected Object lock;
 297  
         protected List renderingJobs;
 298  
 
 299  
         public MonitoringJobCommonjWork(Object lock, List jobs)
 300  0
         {
 301  0
             this.lock = lock;
 302  0
             this.renderingJobs = new ArrayList(jobs);
 303  0
         }
 304  
         
 305  
         public boolean isDaemon()
 306  
         {
 307  0
             return false;
 308  
         }
 309  
         
 310  
         public void run()
 311  
         {
 312  
             try
 313  
             {
 314  0
                 while (!this.renderingJobs.isEmpty())
 315  
                 {
 316  0
                     for (Iterator it = this.renderingJobs.iterator(); it.hasNext(); )
 317  
                     {
 318  0
                         RenderingJob job = (RenderingJob) it.next();
 319  0
                         WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
 320  0
                         int status = WorkEvent.WORK_ACCEPTED;
 321  
                         
 322  0
                         if (workItem != null)
 323  
                         {
 324  0
                             status = workItem.getStatus();
 325  
                         }
 326  
                         
 327  0
                         boolean isTimeout = job.isTimeout();
 328  
                         
 329  0
                         if (isTimeout)
 330  
                         {
 331  0
                             PortletContent content = job.getPortletContent();
 332  
                             
 333  0
                             if (interruptOnTimeout)
 334  
                             {
 335  0
                                 Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
 336  
                                 
 337  0
                                 if (worker != null)
 338  
                                 {
 339  0
                                     synchronized (content)
 340  
                                     {
 341  0
                                         if (!content.isComplete()) {
 342  0
                                             worker.interrupt();
 343  0
                                             content.wait();
 344  
                                         }
 345  0
                                     }
 346  
                                 }
 347  0
                             }
 348  
                             else
 349  
                             {
 350  0
                                 synchronized (content)
 351  
                                 {
 352  0
                                     content.completeWithError();
 353  0
                                 }
 354  
                             }
 355  
                         }
 356  
                         
 357  0
                         if (status == WorkEvent.WORK_COMPLETED || status == WorkEvent.WORK_REJECTED || isTimeout)
 358  
                         {
 359  0
                             it.remove();
 360  
                         }                    
 361  0
                     }
 362  
                     
 363  0
                     if (!this.renderingJobs.isEmpty())
 364  
                     {
 365  0
                         synchronized (this)
 366  
                         {
 367  0
                             wait(100);
 368  0
                         }
 369  
                     }
 370  
                 }
 371  
                 
 372  0
                 synchronized (this.lock)
 373  
                 {
 374  0
                     this.lock.notify();
 375  0
                 }
 376  
             }
 377  0
             catch (Exception e)
 378  
             {
 379  0
                 log.error("Exceptiong during job timeout monitoring.", e);
 380  0
             }
 381  0
         }
 382  
         
 383  
         public void release()
 384  
         {
 385  0
         }
 386  
         
 387  
     }
 388  
 
 389  
     class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
 390  
 
 391  
         long interval = 1000;
 392  
         boolean shouldRun = true;
 393  
         
 394  
         CommonjWorkerRenderingJobTimeoutMonitor(long interval) 
 395  
         {
 396  
             super("CommonjWorkerRenderingJobTimeoutMonitor");
 397  
 
 398  
             if (interval > 0) 
 399  
             {
 400  
                 this.interval = interval;
 401  
             }
 402  
         }
 403  
         /**
 404  
          * Thread.stop() is deprecated.
 405  
          * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, 
 406  
          * effectively causing the thread to shutdown correctly.
 407  
          *
 408  
          */
 409  
         public void endThread()
 410  
         {
 411  
         	shouldRun = false;
 412  
         	this.interrupt();
 413  
         }
 414  
         
 415  
         public void run() {
 416  
             while (shouldRun) {
 417  
                 try 
 418  
                 {
 419  
                     List timeoutJobWorks = new ArrayList();
 420  
                     Collection jobWorks = Arrays.asList(jobWorksMonitored.values().toArray());
 421  
                     
 422  
                     for (Iterator it = jobWorks.iterator(); it.hasNext(); )
 423  
                     {
 424  
                         RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
 425  
                         RenderingJob job = jobWork.getRenderingJob();
 426  
                         
 427  
                         if (job.isTimeout())
 428  
                         {
 429  
                             timeoutJobWorks.add(jobWork);
 430  
                         }
 431  
                     }
 432  
                     
 433  
                     // Now, we can kill the timeout worker(s).
 434  
                     for (Iterator it = timeoutJobWorks.iterator(); it.hasNext(); )
 435  
                     {
 436  
                         RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
 437  
                         RenderingJob job = jobWork.getRenderingJob();
 438  
 
 439  
                         // If the job is just completed, then do not kill the worker.
 440  
                         if (job.isTimeout())
 441  
                         {
 442  
                             killJobWork(jobWork);
 443  
                         }
 444  
                     }
 445  
                 } 
 446  
                 catch (Exception e) 
 447  
                 {
 448  
                     log.error("Exception during job monitoring.", e);
 449  
                 }
 450  
                
 451  
                 try 
 452  
                 {
 453  
                     synchronized (this) 
 454  
                     {
 455  
                         wait(this.interval);
 456  
                     }
 457  
                 } 
 458  
                 catch (InterruptedException e) 
 459  
                 {
 460  
                     ;
 461  
                 }
 462  
             }
 463  
         }
 464  
         
 465  
         public void killJobWork(RenderingJobCommonjWork jobWork) {
 466  
             RenderingJob job = jobWork.getRenderingJob();
 467  
             
 468  
             try {
 469  
                 if (log.isWarnEnabled()) {
 470  
                     PortletWindow window = job.getWindow();
 471  
                     ObjectID windowId = (null != window ? window.getId() : class="keyword">null);
 472  
                     log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
 473  
                 }
 474  
 
 475  
                 PortletContent content = job.getPortletContent();
 476  
                 Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
 477  
                 
 478  
                 if (worker != null)
 479  
                 {
 480  
                     synchronized (content)
 481  
                     {
 482  
                         if (!content.isComplete()) {
 483  
                             worker.interrupt();
 484  
                             content.wait();
 485  
                         }
 486  
                     }
 487  
                 }
 488  
             } catch (Exception e) {
 489  
                 log.error("Exceptiong during job killing.", e);
 490  
             } finally {
 491  
                 WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
 492  
                 
 493  
                 if (workItem != null)
 494  
                 {
 495  
                     removeMonitoredJobWork(workItem);
 496  
                 }
 497  
             }
 498  
         }
 499  
         
 500  
     }
 501  
     
 502  
 }

This report is generated by jcoverage, Maven and Maven JCoverage Plugin.