%line | %branch | |||||||||
---|---|---|---|---|---|---|---|---|---|---|
org.apache.jetspeed.aggregator.impl.WorkerMonitorImpl$RenderingJobTimeoutMonitor |
|
|
1 | /* |
|
2 | * Licensed to the Apache Software Foundation (ASF) under one or more |
|
3 | * contributor license agreements. See the NOTICE file distributed with |
|
4 | * this work for additional information regarding copyright ownership. |
|
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 |
|
6 | * (the "License"); you may not use this file except in compliance with |
|
7 | * the License. You may obtain a copy of the License at |
|
8 | * |
|
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
|
10 | * |
|
11 | * Unless required by applicable law or agreed to in writing, software |
|
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
|
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 | * See the License for the specific language governing permissions and |
|
15 | * limitations under the License. |
|
16 | */ |
|
17 | ||
18 | package org.apache.jetspeed.aggregator.impl; |
|
19 | ||
20 | import java.security.AccessControlContext; |
|
21 | import java.security.AccessController; |
|
22 | import java.util.Iterator; |
|
23 | import java.util.ArrayList; |
|
24 | import java.util.List; |
|
25 | import java.util.Stack; |
|
26 | import java.util.LinkedList; |
|
27 | import java.util.Collections; |
|
28 | ||
29 | import org.apache.commons.logging.Log; |
|
30 | import org.apache.commons.logging.LogFactory; |
|
31 | import org.apache.jetspeed.aggregator.RenderingJob; |
|
32 | import org.apache.jetspeed.aggregator.Worker; |
|
33 | import org.apache.jetspeed.aggregator.WorkerMonitor; |
|
34 | import org.apache.jetspeed.aggregator.PortletContent; |
|
35 | import org.apache.jetspeed.util.Queue; |
|
36 | import org.apache.jetspeed.util.FIFOQueue; |
|
37 | ||
38 | import org.apache.pluto.om.window.PortletWindow; |
|
39 | import org.apache.pluto.om.common.ObjectID; |
|
40 | ||
41 | /** |
|
42 | * The WorkerMonitor is responsible for dispatching jobs to workers |
|
43 | * It uses an Apache HTTPd configuration style of min/max/spare workers |
|
44 | * threads to throttle the rendering work. |
|
45 | * If jobs come in faster that processing, they are stored in a queue |
|
46 | * which is flushed periodically by a QueueMonitor. |
|
47 | * |
|
48 | * @author <a href="mailto:raphael@apache.org">Rapha\u00ebl Luta</a> |
|
49 | * @author <a href="mailto:taylor@apache.org">David Sean Taylor </a> |
|
50 | * @version $Id: WorkerMonitorImpl.java 591867 2007-11-05 02:20:06Z woonsan $ |
|
51 | */ |
|
52 | public class WorkerMonitorImpl implements WorkerMonitor |
|
53 | { |
|
54 | public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName(); |
|
55 | ||
56 | public WorkerMonitorImpl(int minWorkers, class="keyword">int maxWorkers, class="keyword">int spareWorkers, class="keyword">int maxJobsPerWorker) |
|
57 | { |
|
58 | this.minWorkers = minWorkers; |
|
59 | this.maxWorkers = maxWorkers; |
|
60 | this.spareWorkers = spareWorkers; |
|
61 | this.maxJobsPerWorker = maxJobsPerWorker; |
|
62 | } |
|
63 | ||
64 | /** Commons logging */ |
|
65 | protected final static Log log = LogFactory.getLog(WorkerMonitorImpl.class); |
|
66 | ||
67 | /** Static counters for identifying workers */ |
|
68 | protected static long sCount = 0; |
|
69 | ||
70 | /** Count of running jobs **/ |
|
71 | protected int runningJobs = 0; |
|
72 | ||
73 | /** Minimum number of wokers to create */ |
|
74 | protected int minWorkers = 5; |
|
75 | ||
76 | /** Maximum number of workers */ |
|
77 | protected int maxWorkers = 50; |
|
78 | ||
79 | /** Minimum amount of spare workers */ |
|
80 | protected int spareWorkers = 3; |
|
81 | ||
82 | /** Maximum of job processed by a worker before being released */ |
|
83 | protected int maxJobsPerWorker = 10; |
|
84 | ||
85 | /** Stack containing currently idle workers */ |
|
86 | protected Stack workers = new Stack(); |
|
87 | ||
88 | /** The thread group used to group all worker threads */ |
|
89 | protected ThreadGroup tg = new ThreadGroup("Workers"); |
|
90 | ||
91 | /** Job queue */ |
|
92 | protected Queue queue; |
|
93 | ||
94 | /** Workers to be monitored for timeout checking */ |
|
95 | protected List workersMonitored = Collections.synchronizedList(new LinkedList()); |
|
96 | ||
97 | /** Renering Job Timeout monitor */ |
|
98 | protected RenderingJobTimeoutMonitor jobMonitor = null; |
|
99 | ||
100 | public void start() |
|
101 | { |
|
102 | addWorkers(this.minWorkers); |
|
103 | this.queue = new FIFOQueue(); |
|
104 | ||
105 | jobMonitor = new RenderingJobTimeoutMonitor(1000); |
|
106 | jobMonitor.start(); |
|
107 | } |
|
108 | ||
109 | public void stop() |
|
110 | { |
|
111 | if (jobMonitor != null) |
|
112 | jobMonitor.endThread(); |
|
113 | jobMonitor = null; |
|
114 | ||
115 | } |
|
116 | ||
117 | /** |
|
118 | * Create the request number of workers and add them to |
|
119 | * list of available workers. |
|
120 | * |
|
121 | * @param wCount the number of workers to create |
|
122 | */ |
|
123 | protected synchronized void addWorkers(int wCount) |
|
124 | { |
|
125 | int wCurrent = this.tg.activeCount(); |
|
126 | ||
127 | if (wCurrent < maxWorkers) |
|
128 | { |
|
129 | if (wCurrent + wCount > maxWorkers) |
|
130 | { |
|
131 | wCount = maxWorkers - wCurrent; |
|
132 | } |
|
133 | ||
134 | log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount)); |
|
135 | ||
136 | for (int i = 0; i < wCount; ++i) |
|
137 | { |
|
138 | Worker worker = new WorkerImpl(this, class="keyword">this.tg, "WORKER_" + (++sCount)); |
|
139 | worker.start(); |
|
140 | workers.push(worker); |
|
141 | } |
|
142 | } |
|
143 | } |
|
144 | ||
145 | /** |
|
146 | * Retrieves an idle worker |
|
147 | * |
|
148 | * @return a Worker from the idle pool or null if non available |
|
149 | */ |
|
150 | protected Worker getWorker() |
|
151 | { |
|
152 | synchronized(this.workers) |
|
153 | { |
|
154 | if (this.workers.size() < spareWorkers) |
|
155 | { |
|
156 | addWorkers(spareWorkers); |
|
157 | } |
|
158 | ||
159 | if (this.workers.size() == 0) |
|
160 | { |
|
161 | return null; |
|
162 | } |
|
163 | ||
164 | return (Worker)workers.pop(); |
|
165 | } |
|
166 | } |
|
167 | ||
168 | /** |
|
169 | * Assign a job to a worker and execute it or queue the job if no |
|
170 | * worker is available. |
|
171 | * |
|
172 | * @param job the Job to process |
|
173 | */ |
|
174 | public void process(RenderingJob job) |
|
175 | { |
|
176 | Worker worker = this.getWorker(); |
|
177 | ||
178 | AccessControlContext context = AccessController.getContext(); |
|
179 | job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context); |
|
180 | ||
181 | if (worker==null) |
|
182 | { |
|
183 | queue.push(job); |
|
184 | } |
|
185 | else |
|
186 | { |
|
187 | try |
|
188 | { |
|
189 | synchronized (worker) |
|
190 | { |
|
191 | worker.setJob(job, context); |
|
192 | ||
193 | if (job.getTimeout() > 0) { |
|
194 | workersMonitored.add(worker); |
|
195 | } |
|
196 | ||
197 | worker.notify(); |
|
198 | runningJobs++; |
|
199 | } |
|
200 | } |
|
201 | catch (Throwable t) |
|
202 | { |
|
203 | log.error("Worker exception", t); |
|
204 | } |
|
205 | } |
|
206 | } |
|
207 | ||
208 | /** |
|
209 | * Wait for all rendering jobs in the collection to finish successfully or otherwise. |
|
210 | * @param renderingJobs the Collection of rendering job objects to wait for. |
|
211 | */ |
|
212 | public void waitForRenderingJobs(List renderingJobs) |
|
213 | { |
|
214 | try |
|
215 | { |
|
216 | for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); ) |
|
217 | { |
|
218 | RenderingJob job = (RenderingJob) iter.next(); |
|
219 | PortletContent portletContent = job.getPortletContent(); |
|
220 | ||
221 | synchronized (portletContent) |
|
222 | { |
|
223 | if (!portletContent.isComplete()) |
|
224 | { |
|
225 | portletContent.wait(); |
|
226 | } |
|
227 | } |
|
228 | } |
|
229 | } |
|
230 | catch (Exception e) |
|
231 | { |
|
232 | log.error("Exception during synchronizing all portlet rendering jobs.", e); |
|
233 | } |
|
234 | } |
|
235 | ||
236 | /** |
|
237 | * Put back the worker in the idle queue unless there are pending jobs and |
|
238 | * worker can still be committed to a new job before being released. |
|
239 | */ |
|
240 | protected void release(Worker worker) |
|
241 | { |
|
242 | // if worker can still proces some jobs assign the first |
|
243 | // backlog job to this worker, else reset job count and put |
|
244 | // it on the idle queue. |
|
245 | ||
246 | long jobTimeout = 0; |
|
247 | ||
248 | RenderingJob oldJob = (RenderingJob) worker.getJob(); |
|
249 | if (oldJob != null) |
|
250 | { |
|
251 | jobTimeout = oldJob.getTimeout(); |
|
252 | } |
|
253 | ||
254 | synchronized (worker) |
|
255 | { |
|
256 | RenderingJob job = null; |
|
257 | ||
258 | if (worker.getJobCount() < this.maxJobsPerWorker) |
|
259 | { |
|
260 | job = (RenderingJob) queue.pop(); |
|
261 | ||
262 | if (job != null) |
|
263 | { |
|
264 | AccessControlContext context = (AccessControlContext) job.getWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR); |
|
265 | worker.setJob(job, context); |
|
266 | runningJobs--; |
|
267 | return; |
|
268 | } |
|
269 | } |
|
270 | ||
271 | if (job == null) |
|
272 | { |
|
273 | worker.setJob(null); |
|
274 | worker.resetJobCount(); |
|
275 | runningJobs--; |
|
276 | } |
|
277 | } |
|
278 | ||
279 | if (jobTimeout > 0) { |
|
280 | workersMonitored.remove(worker); |
|
281 | } |
|
282 | ||
283 | synchronized (this.workers) |
|
284 | { |
|
285 | this.workers.push(worker); |
|
286 | } |
|
287 | } |
|
288 | ||
289 | public int getQueuedJobsCount() |
|
290 | { |
|
291 | return queue.size(); |
|
292 | } |
|
293 | ||
294 | /** |
|
295 | * Returns a snapshot of the available jobs |
|
296 | * @return available jobs |
|
297 | */ |
|
298 | public int getAvailableJobsCount() |
|
299 | { |
|
300 | return workers.size(); |
|
301 | } |
|
302 | ||
303 | public int getRunningJobsCount() |
|
304 | { |
|
305 | return this.tg.activeCount(); |
|
306 | } |
|
307 | ||
308 | class RenderingJobTimeoutMonitor extends Thread { |
|
309 | ||
310 | 0 | long interval = 1000; |
311 | 0 | boolean shouldRun = true; |
312 | ||
313 | 0 | RenderingJobTimeoutMonitor(long interval) { |
314 | 0 | super("RenderingJobTimeoutMonitor"); |
315 | ||
316 | 0 | if (interval > 0) { |
317 | 0 | this.interval = interval; |
318 | } |
|
319 | 0 | } |
320 | /** |
|
321 | * Thread.stop() is deprecated. |
|
322 | * This method achieves the same by setting the run varaible "shouldRun" to false and interrupting the Thread, |
|
323 | * effectively causing the thread to shutdown correctly. |
|
324 | * |
|
325 | */ |
|
326 | public void endThread() |
|
327 | { |
|
328 | 0 | shouldRun = false; |
329 | 0 | this.interrupt(); |
330 | 0 | } |
331 | ||
332 | public void run() { |
|
333 | 0 | while (shouldRun) { |
334 | try |
|
335 | { |
|
336 | // Because a timeout worker can be removed |
|
337 | // in the workersMonitored collection during iterating, |
|
338 | // copy timeout workers in the following collection to kill later. |
|
339 | ||
340 | 0 | List timeoutWorkers = new ArrayList(); |
341 | ||
342 | 0 | synchronized (workersMonitored) |
343 | { |
|
344 | 0 | for (Iterator it = workersMonitored.iterator(); it.hasNext(); ) |
345 | { |
|
346 | 0 | WorkerImpl worker = (WorkerImpl) it.next(); |
347 | 0 | RenderingJob job = (RenderingJob) worker.getJob(); |
348 | ||
349 | 0 | if ((null != job) && (job.isTimeout())) |
350 | { |
|
351 | 0 | timeoutWorkers.add(worker); |
352 | } |
|
353 | 0 | } |
354 | 0 | } |
355 | ||
356 | // Now, we can kill the timeout worker(s). |
|
357 | 0 | for (Iterator it = timeoutWorkers.iterator(); it.hasNext(); ) |
358 | { |
|
359 | 0 | WorkerImpl worker = (WorkerImpl) it.next(); |
360 | 0 | RenderingJob job = (RenderingJob) worker.getJob(); |
361 | ||
362 | // If the job is just completed, then do not kill the worker. |
|
363 | 0 | if ((null != job) && (job.isTimeout())) |
364 | { |
|
365 | 0 | killJob(worker, job); |
366 | } |
|
367 | 0 | } |
368 | } |
|
369 | 0 | catch (Exception e) |
370 | { |
|
371 | 0 | log.error("Exception during job monitoring.", e); |
372 | 0 | } |
373 | ||
374 | try |
|
375 | { |
|
376 | 0 | synchronized (this) |
377 | { |
|
378 | 0 | wait(this.interval); |
379 | 0 | } |
380 | } |
|
381 | 0 | catch (InterruptedException e) |
382 | { |
|
383 | ; |
|
384 | 0 | } |
385 | } |
|
386 | 0 | } |
387 | ||
388 | public void killJob(WorkerImpl worker, RenderingJob job) { |
|
389 | try { |
|
390 | 0 | if (log.isWarnEnabled()) { |
391 | 0 | PortletWindow window = job.getWindow(); |
392 | 0 | ObjectID windowId = (null != window ? window.getId() : class="keyword">null); |
393 | 0 | log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId); |
394 | } |
|
395 | ||
396 | 0 | PortletContent content = job.getPortletContent(); |
397 | ||
398 | 0 | synchronized (content) |
399 | { |
|
400 | 0 | if (!content.isComplete()) { |
401 | 0 | worker.interrupt(); |
402 | 0 | content.wait(); |
403 | } |
|
404 | 0 | } |
405 | ||
406 | 0 | } catch (Exception e) { |
407 | 0 | log.error("Exceptiong during job killing.", e); |
408 | 0 | } |
409 | 0 | } |
410 | ||
411 | } |
|
412 | } |
This report is generated by jcoverage, Maven and Maven JCoverage Plugin. |