1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.jetspeed.aggregator.impl;
19
20 import java.security.AccessControlContext;
21 import java.security.AccessController;
22 import java.util.Iterator;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Stack;
26 import java.util.LinkedList;
27 import java.util.Collections;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.jetspeed.aggregator.RenderingJob;
32 import org.apache.jetspeed.aggregator.Worker;
33 import org.apache.jetspeed.aggregator.WorkerMonitor;
34 import org.apache.jetspeed.aggregator.PortletContent;
35 import org.apache.jetspeed.util.Queue;
36 import org.apache.jetspeed.util.FIFOQueue;
37
38 import org.apache.pluto.om.window.PortletWindow;
39 import org.apache.pluto.om.common.ObjectID;
40
41 /***
42 * The WorkerMonitor is responsible for dispatching jobs to workers
43 * It uses an Apache HTTPd configuration style of min/max/spare workers
44 * threads to throttle the rendering work.
45 * If jobs come in faster that processing, they are stored in a queue
46 * which is flushed periodically by a QueueMonitor.
47 *
48 * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a>
49 * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a>
50 * @version $Id: WorkerMonitorImpl.java 591867 2007-11-05 02:20:06Z woonsan $
51 */
52 public class WorkerMonitorImpl implements WorkerMonitor
53 {
54 public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
55
56 public WorkerMonitorImpl(int minWorkers, int maxWorkers, int spareWorkers, int maxJobsPerWorker)
57 {
58 this.minWorkers = minWorkers;
59 this.maxWorkers = maxWorkers;
60 this.spareWorkers = spareWorkers;
61 this.maxJobsPerWorker = maxJobsPerWorker;
62 }
63
64 /*** Commons logging */
65 protected final static Log log = LogFactory.getLog(WorkerMonitorImpl.class);
66
67 /*** Static counters for identifying workers */
68 protected static long sCount = 0;
69
70 /*** Count of running jobs **/
71 protected int runningJobs = 0;
72
73 /*** Minimum number of wokers to create */
74 protected int minWorkers = 5;
75
76 /*** Maximum number of workers */
77 protected int maxWorkers = 50;
78
79 /*** Minimum amount of spare workers */
80 protected int spareWorkers = 3;
81
82 /*** Maximum of job processed by a worker before being released */
83 protected int maxJobsPerWorker = 10;
84
85 /*** Stack containing currently idle workers */
86 protected Stack workers = new Stack();
87
88 /*** The thread group used to group all worker threads */
89 protected ThreadGroup tg = new ThreadGroup("Workers");
90
91 /*** Job queue */
92 protected Queue queue;
93
94 /*** Workers to be monitored for timeout checking */
95 protected List workersMonitored = Collections.synchronizedList(new LinkedList());
96
97 /*** Renering Job Timeout monitor */
98 protected RenderingJobTimeoutMonitor jobMonitor = null;
99
100 public void start()
101 {
102 addWorkers(this.minWorkers);
103 this.queue = new FIFOQueue();
104
105 jobMonitor = new RenderingJobTimeoutMonitor(1000);
106 jobMonitor.start();
107 }
108
109 public void stop()
110 {
111 if (jobMonitor != null)
112 jobMonitor.endThread();
113 jobMonitor = null;
114
115 }
116
117 /***
118 * Create the request number of workers and add them to
119 * list of available workers.
120 *
121 * @param wCount the number of workers to create
122 */
123 protected synchronized void addWorkers(int wCount)
124 {
125 int wCurrent = this.tg.activeCount();
126
127 if (wCurrent < maxWorkers)
128 {
129 if (wCurrent + wCount > maxWorkers)
130 {
131 wCount = maxWorkers - wCurrent;
132 }
133
134 log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
135
136 for (int i = 0; i < wCount; ++i)
137 {
138 Worker worker = new WorkerImpl(this, this.tg, "WORKER_" + (++sCount));
139 worker.start();
140 workers.push(worker);
141 }
142 }
143 }
144
145 /***
146 * Retrieves an idle worker
147 *
148 * @return a Worker from the idle pool or null if non available
149 */
150 protected Worker getWorker()
151 {
152 synchronized(this.workers)
153 {
154 if (this.workers.size() < spareWorkers)
155 {
156 addWorkers(spareWorkers);
157 }
158
159 if (this.workers.size() == 0)
160 {
161 return null;
162 }
163
164 return (Worker)workers.pop();
165 }
166 }
167
168 /***
169 * Assign a job to a worker and execute it or queue the job if no
170 * worker is available.
171 *
172 * @param job the Job to process
173 */
174 public void process(RenderingJob job)
175 {
176 Worker worker = this.getWorker();
177
178 AccessControlContext context = AccessController.getContext();
179 job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context);
180
181 if (worker==null)
182 {
183 queue.push(job);
184 }
185 else
186 {
187 try
188 {
189 synchronized (worker)
190 {
191 worker.setJob(job, context);
192
193 if (job.getTimeout() > 0) {
194 workersMonitored.add(worker);
195 }
196
197 worker.notify();
198 runningJobs++;
199 }
200 }
201 catch (Throwable t)
202 {
203 log.error("Worker exception", t);
204 }
205 }
206 }
207
208 /***
209 * Wait for all rendering jobs in the collection to finish successfully or otherwise.
210 * @param renderingJobs the Collection of rendering job objects to wait for.
211 */
212 public void waitForRenderingJobs(List renderingJobs)
213 {
214 try
215 {
216 for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
217 {
218 RenderingJob job = (RenderingJob) iter.next();
219 PortletContent portletContent = job.getPortletContent();
220
221 synchronized (portletContent)
222 {
223 if (!portletContent.isComplete())
224 {
225 portletContent.wait();
226 }
227 }
228 }
229 }
230 catch (Exception e)
231 {
232 log.error("Exception during synchronizing all portlet rendering jobs.", e);
233 }
234 }
235
236 /***
237 * Put back the worker in the idle queue unless there are pending jobs and
238 * worker can still be committed to a new job before being released.
239 */
240 protected void release(Worker worker)
241 {
242
243
244
245
246 long jobTimeout = 0;
247
248 RenderingJob oldJob = (RenderingJob) worker.getJob();
249 if (oldJob != null)
250 {
251 jobTimeout = oldJob.getTimeout();
252 }
253
254 synchronized (worker)
255 {
256 RenderingJob job = null;
257
258 if (worker.getJobCount() < this.maxJobsPerWorker)
259 {
260 job = (RenderingJob) queue.pop();
261
262 if (job != null)
263 {
264 AccessControlContext context = (AccessControlContext) job.getWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR);
265 worker.setJob(job, context);
266 runningJobs--;
267 return;
268 }
269 }
270
271 if (job == null)
272 {
273 worker.setJob(null);
274 worker.resetJobCount();
275 runningJobs--;
276 }
277 }
278
279 if (jobTimeout > 0) {
280 workersMonitored.remove(worker);
281 }
282
283 synchronized (this.workers)
284 {
285 this.workers.push(worker);
286 }
287 }
288
289 public int getQueuedJobsCount()
290 {
291 return queue.size();
292 }
293
294 /***
295 * Returns a snapshot of the available jobs
296 * @return available jobs
297 */
298 public int getAvailableJobsCount()
299 {
300 return workers.size();
301 }
302
303 public int getRunningJobsCount()
304 {
305 return this.tg.activeCount();
306 }
307
308 class RenderingJobTimeoutMonitor extends Thread {
309
310 long interval = 1000;
311 boolean shouldRun = true;
312
313 RenderingJobTimeoutMonitor(long interval) {
314 super("RenderingJobTimeoutMonitor");
315
316 if (interval > 0) {
317 this.interval = interval;
318 }
319 }
320 /***
321 * Thread.stop() is deprecated.
322 * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread,
323 * effectively causing the thread to shutdown correctly.
324 *
325 */
326 public void endThread()
327 {
328 shouldRun = false;
329 this.interrupt();
330 }
331
332 public void run() {
333 while (shouldRun) {
334 try
335 {
336
337
338
339
340 List timeoutWorkers = new ArrayList();
341
342 synchronized (workersMonitored)
343 {
344 for (Iterator it = workersMonitored.iterator(); it.hasNext(); )
345 {
346 WorkerImpl worker = (WorkerImpl) it.next();
347 RenderingJob job = (RenderingJob) worker.getJob();
348
349 if ((null != job) && (job.isTimeout()))
350 {
351 timeoutWorkers.add(worker);
352 }
353 }
354 }
355
356
357 for (Iterator it = timeoutWorkers.iterator(); it.hasNext(); )
358 {
359 WorkerImpl worker = (WorkerImpl) it.next();
360 RenderingJob job = (RenderingJob) worker.getJob();
361
362
363 if ((null != job) && (job.isTimeout()))
364 {
365 killJob(worker, job);
366 }
367 }
368 }
369 catch (Exception e)
370 {
371 log.error("Exception during job monitoring.", e);
372 }
373
374 try
375 {
376 synchronized (this)
377 {
378 wait(this.interval);
379 }
380 }
381 catch (InterruptedException e)
382 {
383 ;
384 }
385 }
386 }
387
388 public void killJob(WorkerImpl worker, RenderingJob job) {
389 try {
390 if (log.isWarnEnabled()) {
391 PortletWindow window = job.getWindow();
392 ObjectID windowId = (null != window ? window.getId() : null);
393 log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId);
394 }
395
396 PortletContent content = job.getPortletContent();
397
398 synchronized (content)
399 {
400 if (!content.isComplete()) {
401 worker.interrupt();
402 content.wait();
403 }
404 }
405
406 } catch (Exception e) {
407 log.error("Exceptiong during job killing.", e);
408 }
409 }
410
411 }
412 }