View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.jetspeed.aggregator.impl;
19  
20  import java.security.AccessControlContext;
21  import java.security.AccessController;
22  import java.util.Iterator;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Stack;
26  import java.util.LinkedList;
27  import java.util.Collections;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.jetspeed.aggregator.RenderingJob;
32  import org.apache.jetspeed.aggregator.Worker;
33  import org.apache.jetspeed.aggregator.WorkerMonitor;
34  import org.apache.jetspeed.aggregator.PortletContent;
35  import org.apache.jetspeed.util.Queue;
36  import org.apache.jetspeed.util.FIFOQueue;
37  
38  import org.apache.pluto.om.window.PortletWindow;
39  import org.apache.pluto.om.common.ObjectID;
40  
41  /***
42   * The WorkerMonitor is responsible for dispatching jobs to workers
43   * It uses an Apache HTTPd configuration style of min/max/spare workers
44   * threads to throttle the rendering work.
45   * If jobs come in faster that processing, they are stored in a queue
46   * which is flushed periodically by a QueueMonitor.
47   *
48   * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a>
49   * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
50   * @version $Id: WorkerMonitorImpl.java 591867 2007-11-05 02:20:06Z woonsan $
51   */
52  public class WorkerMonitorImpl implements WorkerMonitor
53  {
54      public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
55  
56      public WorkerMonitorImpl(int minWorkers, int maxWorkers, int spareWorkers, int maxJobsPerWorker)
57      {
58          this.minWorkers = minWorkers;
59          this.maxWorkers = maxWorkers;
60          this.spareWorkers = spareWorkers;
61          this.maxJobsPerWorker = maxJobsPerWorker;
62      }
63      
64      /*** Commons logging */
65      protected final static Log log = LogFactory.getLog(WorkerMonitorImpl.class);
66  
67      /*** Static counters for identifying workers */
68      protected static long sCount = 0;
69  
70      /*** Count of running jobs **/
71      protected int runningJobs = 0;
72      
73      /*** Minimum number of wokers to create */
74      protected int minWorkers = 5;
75  
76      /*** Maximum number of workers */
77      protected int maxWorkers = 50;
78  
79      /*** Minimum amount of spare workers */
80      protected int spareWorkers = 3;
81  
82      /*** Maximum of job processed by a worker before being released */
83      protected int maxJobsPerWorker = 10;
84  
85      /*** Stack containing currently idle workers */
86      protected Stack workers = new Stack();
87  
88      /*** The thread group used to group all worker threads */
89      protected ThreadGroup tg = new ThreadGroup("Workers");
90  
91      /*** Job queue */
92      protected Queue queue;
93  
94      /*** Workers to be monitored for timeout checking */
95      protected List workersMonitored = Collections.synchronizedList(new LinkedList());
96  
97      /*** Renering Job Timeout monitor */
98      protected RenderingJobTimeoutMonitor jobMonitor = null;
99  
100     public void start()
101     {
102         addWorkers(this.minWorkers);
103         this.queue = new FIFOQueue();
104 
105         jobMonitor = new RenderingJobTimeoutMonitor(1000);
106         jobMonitor.start();
107     }
108 
109     public void stop()
110     {    
111     	if (jobMonitor != null)
112     		jobMonitor.endThread();
113     	jobMonitor = null;
114     	
115     }
116 
117     /***
118      * Create the request number of workers and add them to
119      * list of available workers.
120      *
121      * @param wCount the number of workers to create
122      */
123     protected synchronized void addWorkers(int wCount)
124     {
125         int wCurrent = this.tg.activeCount();
126 
127         if (wCurrent < maxWorkers)
128         {
129             if (wCurrent + wCount > maxWorkers)
130             {
131                 wCount = maxWorkers - wCurrent;
132             }
133 
134             log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
135 
136             for (int i = 0; i < wCount; ++i)
137             {
138                 Worker worker = new WorkerImpl(this, this.tg, "WORKER_" + (++sCount));
139                 worker.start();
140                 workers.push(worker);
141             }
142         }
143     }
144 
145     /***
146      * Retrieves an idle worker
147      *
148      * @return a Worker from the idle pool or null if non available
149      */
150     protected Worker getWorker()
151     {
152         synchronized(this.workers)
153         {
154             if (this.workers.size() < spareWorkers)
155             {
156                 addWorkers(spareWorkers);
157             }
158 
159             if (this.workers.size() == 0)
160             {
161                 return null;
162             }
163 
164             return (Worker)workers.pop();
165         }
166     }
167 
168     /***
169      * Assign a job to a worker and execute it or queue the job if no
170      * worker is available.
171      *
172      * @param job the Job to process
173      */
174     public void process(RenderingJob job)
175     {
176         Worker worker = this.getWorker();
177 
178         AccessControlContext context = AccessController.getContext();
179         job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context);
180         
181         if (worker==null)
182         {
183             queue.push(job);
184         }
185         else
186         {
187             try
188             {
189                 synchronized (worker)
190                 {
191                     worker.setJob(job, context);
192 
193                     if (job.getTimeout() > 0) {
194                         workersMonitored.add(worker);
195                     }
196 
197                     worker.notify();
198                     runningJobs++;
199                 }
200             }
201             catch (Throwable t)
202             {
203                 log.error("Worker exception", t);
204             }
205         }
206     }
207     
208     /***
209      * Wait for all rendering jobs in the collection to finish successfully or otherwise. 
210      * @param renderingJobs the Collection of rendering job objects to wait for.
211      */
212     public void waitForRenderingJobs(List renderingJobs)
213     {
214         try 
215         {
216             for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
217             {
218                 RenderingJob job = (RenderingJob) iter.next();
219                 PortletContent portletContent = job.getPortletContent();
220                 
221                 synchronized (portletContent) 
222                 {
223                     if (!portletContent.isComplete()) 
224                     {
225                         portletContent.wait();
226                     }
227                 }
228             }
229         }
230         catch (Exception e)
231         {
232             log.error("Exception during synchronizing all portlet rendering jobs.", e);
233         }
234     }
235 
236     /***
237      * Put back the worker in the idle queue unless there are pending jobs and
238      * worker can still be committed to a new job before being released.
239      */
240     protected void release(Worker worker)
241     {
242         // if worker can still proces some jobs assign the first
243         // backlog job to this worker, else reset job count and put
244         // it on the idle queue.
245 
246         long jobTimeout = 0;
247 
248         RenderingJob oldJob = (RenderingJob) worker.getJob();
249         if (oldJob != null)
250         {
251             jobTimeout = oldJob.getTimeout();
252         }
253 
254         synchronized (worker)
255         {
256             RenderingJob job = null;
257             
258             if (worker.getJobCount() < this.maxJobsPerWorker)
259             {
260                 job = (RenderingJob) queue.pop();
261                 
262                 if (job != null)
263                 {
264                     AccessControlContext context = (AccessControlContext) job.getWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR);
265                     worker.setJob(job, context);
266                     runningJobs--;
267                     return;
268                 }
269             }
270             
271             if (job == null)
272             {
273                 worker.setJob(null);
274                 worker.resetJobCount();
275                 runningJobs--;
276             }
277         }
278 
279         if (jobTimeout > 0) {
280             workersMonitored.remove(worker);
281         }
282 
283         synchronized (this.workers)
284         {
285             this.workers.push(worker);
286         }
287     }
288 
289     public int getQueuedJobsCount()
290     {
291         return queue.size();
292     }
293     
294     /***
295      * Returns a snapshot of the available jobs
296      * @return available jobs
297      */
298     public int getAvailableJobsCount()
299     {
300         return workers.size();
301     }
302     
303     public int getRunningJobsCount()
304     {
305         return this.tg.activeCount();
306     }
307     
308     class RenderingJobTimeoutMonitor extends Thread {
309 
310         long interval = 1000;
311         boolean shouldRun = true;
312         
313         RenderingJobTimeoutMonitor(long interval) {
314             super("RenderingJobTimeoutMonitor");
315 
316             if (interval > 0) {
317                 this.interval = interval;
318             }
319         }
320         /***
321          * Thread.stop() is deprecated.
322          * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, 
323          * effectively causing the thread to shutdown correctly.
324          *
325          */
326         public void endThread()
327         {
328         	shouldRun = false;
329         	this.interrupt();
330         }
331         
332         public void run() {
333             while (shouldRun) {
334                 try 
335                 {
336                     // Because a timeout worker can be removed 
337                     // in the workersMonitored collection during iterating,
338                     // copy timeout workers in the following collection to kill later.
339 
340                     List timeoutWorkers = new ArrayList();
341 
342                     synchronized (workersMonitored) 
343                     {
344                         for (Iterator it = workersMonitored.iterator(); it.hasNext(); )
345                         {
346                             WorkerImpl worker = (WorkerImpl) it.next();
347                             RenderingJob job = (RenderingJob) worker.getJob();
348                             
349                             if ((null != job) && (job.isTimeout()))
350                             {
351                                 timeoutWorkers.add(worker);
352                             }
353                         }
354                     }
355 
356                     // Now, we can kill the timeout worker(s).
357                     for (Iterator it = timeoutWorkers.iterator(); it.hasNext(); )
358                     {
359                         WorkerImpl worker = (WorkerImpl) it.next();
360                         RenderingJob job = (RenderingJob) worker.getJob();
361 
362                         // If the job is just completed, then do not kill the worker.
363                         if ((null != job) && (job.isTimeout()))
364                         {
365                             killJob(worker, job);
366                         }
367                     }
368                 } 
369                 catch (Exception e) 
370                 {
371                     log.error("Exception during job monitoring.", e);
372                 }
373                
374                 try 
375                 {
376                     synchronized (this) 
377                     {
378                         wait(this.interval);
379                     }
380                 } 
381                 catch (InterruptedException e) 
382                 {
383                     ;
384                 }
385             }
386         }
387 
388         public void killJob(WorkerImpl worker, RenderingJob job) {
389             try {
390                 if (log.isWarnEnabled()) {
391                     PortletWindow window = job.getWindow();
392                     ObjectID windowId = (null != window ? window.getId() : null);
393                     log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
394                 }
395 
396                 PortletContent content = job.getPortletContent();
397                 
398                 synchronized (content)
399                 {
400                     if (!content.isComplete()) {
401                         worker.interrupt();
402                         content.wait();
403                     }
404                 }
405                 
406             } catch (Exception e) {
407                 log.error("Exceptiong during job killing.", e);
408             }
409         }
410 
411     }
412 }