View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.jetspeed.aggregator.impl;
19  
20  import java.security.AccessControlContext;
21  import java.security.AccessController;
22  import java.util.List;
23  import java.util.ArrayList;
24  import java.util.Iterator;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Map;
28  import java.util.HashMap;
29  import java.util.Arrays;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jetspeed.aggregator.RenderingJob;
34  import org.apache.jetspeed.aggregator.Worker;
35  import org.apache.jetspeed.aggregator.WorkerMonitor;
36  import org.apache.jetspeed.aggregator.PortletContent;
37  
38  import org.apache.pluto.om.window.PortletWindow;
39  import org.apache.pluto.om.common.ObjectID;
40  
41  import commonj.work.WorkManager;
42  import commonj.work.Work;
43  import commonj.work.WorkItem;
44  import commonj.work.WorkListener;
45  import commonj.work.WorkEvent;
46  
47  /***
48   * The CommonjWorkerMonitorImpl is responsible for dispatching jobs to workers
49   * It wraps CommonJ WorkManager supported by IBM WebSphere and BEA WebLogic sever.
50   *
51   * @author <a href="mailto:woon_san@apache.org">Woonsan Ko</a>
52   * @version $Id: CommonjWorkerMonitorImpl.java 568339 2007-08-22 00:14:51Z ate $
53   */
54  public class CommonjWorkerMonitorImpl implements WorkerMonitor, WorkListener
55  {
56  
57      public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
58      public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class.getName();
59      public static final String WORKER_THREAD_ATTR = Worker.class.getName();
60      
61      /*** CommonJ Work Manamger provided by JavaEE container */
62      protected WorkManager workManager;
63  
64      /*** If true, invoke interrupt() on the worker thread when the job is timeout. */
65      protected boolean interruptOnTimeout = true;
66      
67      /*** Enable rendering job works monitor thread for timeout checking */
68      protected boolean jobWorksMonitorEnabled = true;
69      
70      /*** Rendering job works to be monitored for timeout checking */
71      protected Map jobWorksMonitored = Collections.synchronizedMap(new HashMap());
72      
73      public CommonjWorkerMonitorImpl(WorkManager workManager)
74      {
75          this(workManager, true);
76      }
77      
78      public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled)
79      {
80          this(workManager, jobWorksMonitorEnabled, true);
81      }
82      
83      public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled, boolean interruptOnTimeout)
84      {
85          this.workManager = workManager;
86          this.jobWorksMonitorEnabled = jobWorksMonitorEnabled;
87          this.interruptOnTimeout = interruptOnTimeout;
88      }
89      
90      /*** Commons logging */
91      protected final static Log log = LogFactory.getLog(CommonjWorkerMonitorImpl.class);
92      
93      /*** Renering Job Timeout monitor */
94      protected CommonjWorkerRenderingJobTimeoutMonitor jobMonitor = null;
95      
96      public void start()
97      {
98          if (this.jobWorksMonitorEnabled)
99          {
100             jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
101             jobMonitor.start();
102         }
103     }
104 
105     public void stop()
106     {
107         if (jobMonitor != null)
108         {
109             jobMonitor.endThread();
110         }
111 
112         jobMonitor = null;
113     }
114     
115     /***
116      * Assign a job to a worker and execute it or queue the job if no
117      * worker is available.
118      *
119      * @param job the Job to process
120      */
121     public void process(RenderingJob job)
122     {
123         AccessControlContext context = AccessController.getContext();
124         job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context);
125         
126         try
127         {
128             RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(job);
129             WorkItem workItem = this.workManager.schedule(jobWork, this);
130             job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
131             
132             if (this.jobWorksMonitorEnabled)
133             {
134                 this.jobWorksMonitored.put(workItem, jobWork);
135             }
136         }
137         catch (Throwable t)
138         {
139             log.error("Worker exception", t);
140         }
141     }
142 
143     public int getQueuedJobsCount()
144     {
145         return 0;
146     }
147     
148     /***
149      * Wait for all rendering jobs in the collection to finish successfully or otherwise. 
150      * @param renderingJobs the Collection of rendering job objects to wait for.
151      */
152     public void waitForRenderingJobs(List renderingJobs)
153     {
154         if (this.jobWorksMonitorEnabled)
155         {
156             try 
157             {
158                 for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
159                 {
160                     RenderingJob job = (RenderingJob) iter.next();
161                     PortletContent portletContent = job.getPortletContent();
162                     
163                     synchronized (portletContent) 
164                     {
165                         if (!portletContent.isComplete()) 
166                         {
167                             portletContent.wait();
168                         }
169                     }
170                 }
171             }
172             catch (Exception e)
173             {
174                 log.error("Exception during synchronizing all portlet rendering jobs.", e);
175             }
176         }
177         else
178         {
179             // We cannot use WorkingManager#waitForAll(workitems, timeout_ms) for timeout.
180             // The second argument could be either WorkManager.IMMEDIATE or WorkManager.INDEFINITE.
181             
182             try
183             {
184                 if (!renderingJobs.isEmpty())
185                 {
186                     Object lock = new Object();
187                     MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(lock, renderingJobs);
188                     
189                     synchronized (lock)
190                     {
191                         WorkItem monitorWorkItem = this.workManager.schedule(monitoringWork, this);
192                         lock.wait();
193                     }
194                 }
195             }
196             catch (Exception e)
197             {
198                 log.error("Exception during synchronizing all portlet rendering jobs.", e);
199             }
200         }
201     }
202     
203     /***
204      * Returns a snapshot of the available jobs
205      * @return available jobs
206      */
207     public int getAvailableJobsCount()
208     {
209         return 0;
210     }
211     
212     public int getRunningJobsCount()
213     {
214         return 0;
215     }
216     
217     // commonj.work.WorkListener implementations
218     
219     public void workAccepted(WorkEvent we)
220     {
221         WorkItem workItem = we.getWorkItem();
222         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workAccepted: " + workItem);
223     }
224 
225     public void workRejected(WorkEvent we)
226     {
227         WorkItem workItem = we.getWorkItem();
228         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workRejected: " + workItem);
229         
230         if (this.jobWorksMonitorEnabled)
231         {
232             removeMonitoredJobWork(workItem);
233         }
234     }
235 
236     public void workStarted(WorkEvent we)
237     {
238         WorkItem workItem = we.getWorkItem();
239         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workStarted: " + workItem);
240     }
241 
242     public void workCompleted(WorkEvent we)
243     {
244         WorkItem workItem = we.getWorkItem();
245         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workCompleted: " + workItem);
246         
247         if (this.jobWorksMonitorEnabled)
248         {
249             removeMonitoredJobWork(workItem);
250         }
251     }
252     
253     protected Object removeMonitoredJobWork(WorkItem workItem)
254     {
255         return this.jobWorksMonitored.remove(workItem);
256     }
257     
258     class RenderingJobCommonjWork implements Work
259     {
260 
261         protected RenderingJob job;
262 
263         public RenderingJobCommonjWork(RenderingJob job)
264         {
265             this.job = job;
266         }
267 
268         public boolean isDaemon()
269         {
270             return false;
271         }
272         
273         public void run()
274         {
275             if (jobWorksMonitorEnabled || interruptOnTimeout)
276             {
277                 this.job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread.currentThread());
278             }
279             
280             this.job.run();
281         }
282         
283         public void release()
284         {
285         }
286         
287         public RenderingJob getRenderingJob()
288         {
289             return this.job;
290         }
291     }
292 
293     class MonitoringJobCommonjWork implements Work
294     {
295         
296         protected Object lock;
297         protected List renderingJobs;
298 
299         public MonitoringJobCommonjWork(Object lock, List jobs)
300         {
301             this.lock = lock;
302             this.renderingJobs = new ArrayList(jobs);
303         }
304         
305         public boolean isDaemon()
306         {
307             return false;
308         }
309         
310         public void run()
311         {
312             try
313             {
314                 while (!this.renderingJobs.isEmpty())
315                 {
316                     for (Iterator it = this.renderingJobs.iterator(); it.hasNext(); )
317                     {
318                         RenderingJob job = (RenderingJob) it.next();
319                         WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
320                         int status = WorkEvent.WORK_ACCEPTED;
321                         
322                         if (workItem != null)
323                         {
324                             status = workItem.getStatus();
325                         }
326                         
327                         boolean isTimeout = job.isTimeout();
328                         
329                         if (isTimeout)
330                         {
331                             PortletContent content = job.getPortletContent();
332                             
333                             if (interruptOnTimeout)
334                             {
335                                 Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
336                                 
337                                 if (worker != null)
338                                 {
339                                     synchronized (content)
340                                     {
341                                         if (!content.isComplete()) {
342                                             worker.interrupt();
343                                             content.wait();
344                                         }
345                                     }
346                                 }
347                             }
348                             else
349                             {
350                                 synchronized (content)
351                                 {
352                                     content.completeWithError();
353                                 }
354                             }
355                         }
356                         
357                         if (status == WorkEvent.WORK_COMPLETED || status == WorkEvent.WORK_REJECTED || isTimeout)
358                         {
359                             it.remove();
360                         }                    
361                     }
362                     
363                     if (!this.renderingJobs.isEmpty())
364                     {
365                         synchronized (this)
366                         {
367                             wait(100);
368                         }
369                     }
370                 }
371                 
372                 synchronized (this.lock)
373                 {
374                     this.lock.notify();
375                 }
376             }
377             catch (Exception e)
378             {
379                 log.error("Exceptiong during job timeout monitoring.", e);
380             }
381         }
382         
383         public void release()
384         {
385         }
386         
387     }
388 
389     class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
390 
391         long interval = 1000;
392         boolean shouldRun = true;
393         
394         CommonjWorkerRenderingJobTimeoutMonitor(long interval) 
395         {
396             super("CommonjWorkerRenderingJobTimeoutMonitor");
397 
398             if (interval > 0) 
399             {
400                 this.interval = interval;
401             }
402         }
403         /***
404          * Thread.stop() is deprecated.
405          * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, 
406          * effectively causing the thread to shutdown correctly.
407          *
408          */
409         public void endThread()
410         {
411         	shouldRun = false;
412         	this.interrupt();
413         }
414         
415         public void run() {
416             while (shouldRun) {
417                 try 
418                 {
419                     List timeoutJobWorks = new ArrayList();
420                     Collection jobWorks = Arrays.asList(jobWorksMonitored.values().toArray());
421                     
422                     for (Iterator it = jobWorks.iterator(); it.hasNext(); )
423                     {
424                         RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
425                         RenderingJob job = jobWork.getRenderingJob();
426                         
427                         if (job.isTimeout())
428                         {
429                             timeoutJobWorks.add(jobWork);
430                         }
431                     }
432                     
433                     // Now, we can kill the timeout worker(s).
434                     for (Iterator it = timeoutJobWorks.iterator(); it.hasNext(); )
435                     {
436                         RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
437                         RenderingJob job = jobWork.getRenderingJob();
438 
439                         // If the job is just completed, then do not kill the worker.
440                         if (job.isTimeout())
441                         {
442                             killJobWork(jobWork);
443                         }
444                     }
445                 } 
446                 catch (Exception e) 
447                 {
448                     log.error("Exception during job monitoring.", e);
449                 }
450                
451                 try 
452                 {
453                     synchronized (this) 
454                     {
455                         wait(this.interval);
456                     }
457                 } 
458                 catch (InterruptedException e) 
459                 {
460                     ;
461                 }
462             }
463         }
464         
465         public void killJobWork(RenderingJobCommonjWork jobWork) {
466             RenderingJob job = jobWork.getRenderingJob();
467             
468             try {
469                 if (log.isWarnEnabled()) {
470                     PortletWindow window = job.getWindow();
471                     ObjectID windowId = (null != window ? window.getId() : null);
472                     log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
473                 }
474 
475                 PortletContent content = job.getPortletContent();
476                 Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
477                 
478                 if (worker != null)
479                 {
480                     synchronized (content)
481                     {
482                         if (!content.isComplete()) {
483                             worker.interrupt();
484                             content.wait();
485                         }
486                     }
487                 }
488             } catch (Exception e) {
489                 log.error("Exceptiong during job killing.", e);
490             } finally {
491                 WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
492                 
493                 if (workItem != null)
494                 {
495                     removeMonitoredJobWork(workItem);
496                 }
497             }
498         }
499         
500     }
501     
502 }