1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
|
18 |
|
package org.apache.jetspeed.aggregator.impl; |
19 |
|
|
20 |
|
import java.security.AccessControlContext; |
21 |
|
import java.security.AccessController; |
22 |
|
import java.util.List; |
23 |
|
import java.util.ArrayList; |
24 |
|
import java.util.Iterator; |
25 |
|
import java.util.Collection; |
26 |
|
import java.util.Collections; |
27 |
|
import java.util.Map; |
28 |
|
import java.util.HashMap; |
29 |
|
import java.util.Arrays; |
30 |
|
|
31 |
|
import org.apache.commons.logging.Log; |
32 |
|
import org.apache.commons.logging.LogFactory; |
33 |
|
import org.apache.jetspeed.aggregator.RenderingJob; |
34 |
|
import org.apache.jetspeed.aggregator.Worker; |
35 |
|
import org.apache.jetspeed.aggregator.WorkerMonitor; |
36 |
|
import org.apache.jetspeed.aggregator.PortletContent; |
37 |
|
|
38 |
|
import org.apache.pluto.om.window.PortletWindow; |
39 |
|
import org.apache.pluto.om.common.ObjectID; |
40 |
|
|
41 |
|
import commonj.work.WorkManager; |
42 |
|
import commonj.work.Work; |
43 |
|
import commonj.work.WorkItem; |
44 |
|
import commonj.work.WorkListener; |
45 |
|
import commonj.work.WorkEvent; |
46 |
|
|
47 |
|
|
48 |
|
|
49 |
|
|
50 |
|
|
51 |
|
|
52 |
|
|
53 |
|
|
54 |
|
public class CommonjWorkerMonitorImpl implements WorkerMonitor, WorkListener |
55 |
|
{ |
56 |
|
|
57 |
|
public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName(); |
58 |
|
public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class.getName(); |
59 |
|
public static final String WORKER_THREAD_ATTR = Worker.class.getName(); |
60 |
|
|
61 |
|
|
62 |
|
protected WorkManager workManager; |
63 |
|
|
64 |
|
|
65 |
|
protected boolean interruptOnTimeout = true; |
66 |
|
|
67 |
|
|
68 |
|
protected boolean jobWorksMonitorEnabled = true; |
69 |
|
|
70 |
|
|
71 |
|
protected Map jobWorksMonitored = Collections.synchronizedMap(new HashMap()); |
72 |
|
|
73 |
|
public CommonjWorkerMonitorImpl(WorkManager workManager) |
74 |
|
{ |
75 |
|
this(workManager, true); |
76 |
|
} |
77 |
|
|
78 |
|
public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled) |
79 |
|
{ |
80 |
|
this(workManager, jobWorksMonitorEnabled, true); |
81 |
|
} |
82 |
|
|
83 |
|
public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled, class="keyword">boolean interruptOnTimeout) |
84 |
|
{ |
85 |
|
this.workManager = workManager; |
86 |
|
this.jobWorksMonitorEnabled = jobWorksMonitorEnabled; |
87 |
|
this.interruptOnTimeout = interruptOnTimeout; |
88 |
|
} |
89 |
|
|
90 |
|
|
91 |
|
protected final static Log log = LogFactory.getLog(CommonjWorkerMonitorImpl.class); |
92 |
|
|
93 |
|
|
94 |
|
protected CommonjWorkerRenderingJobTimeoutMonitor jobMonitor = null; |
95 |
|
|
96 |
|
public void start() |
97 |
|
{ |
98 |
|
if (this.jobWorksMonitorEnabled) |
99 |
|
{ |
100 |
|
jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000); |
101 |
|
jobMonitor.start(); |
102 |
|
} |
103 |
|
} |
104 |
|
|
105 |
|
public void stop() |
106 |
|
{ |
107 |
|
if (jobMonitor != null) |
108 |
|
{ |
109 |
|
jobMonitor.endThread(); |
110 |
|
} |
111 |
|
|
112 |
|
jobMonitor = null; |
113 |
|
} |
114 |
|
|
115 |
|
|
116 |
|
|
117 |
|
|
118 |
|
|
119 |
|
|
120 |
|
|
121 |
|
public void process(RenderingJob job) |
122 |
|
{ |
123 |
|
AccessControlContext context = AccessController.getContext(); |
124 |
|
job.setWorkerAttribute(ACCESS_CONTROL_CONTEXT_WORKER_ATTR, context); |
125 |
|
|
126 |
|
try |
127 |
|
{ |
128 |
|
RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(job); |
129 |
|
WorkItem workItem = this.workManager.schedule(jobWork, class="keyword">this); |
130 |
|
job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem); |
131 |
|
|
132 |
|
if (this.jobWorksMonitorEnabled) |
133 |
|
{ |
134 |
|
this.jobWorksMonitored.put(workItem, jobWork); |
135 |
|
} |
136 |
|
} |
137 |
|
catch (Throwable t) |
138 |
|
{ |
139 |
|
log.error("Worker exception", t); |
140 |
|
} |
141 |
|
} |
142 |
|
|
143 |
|
public int getQueuedJobsCount() |
144 |
|
{ |
145 |
|
return 0; |
146 |
|
} |
147 |
|
|
148 |
|
|
149 |
|
|
150 |
|
|
151 |
|
|
152 |
|
public void waitForRenderingJobs(List renderingJobs) |
153 |
|
{ |
154 |
|
if (this.jobWorksMonitorEnabled) |
155 |
|
{ |
156 |
|
try |
157 |
|
{ |
158 |
|
for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); ) |
159 |
|
{ |
160 |
|
RenderingJob job = (RenderingJob) iter.next(); |
161 |
|
PortletContent portletContent = job.getPortletContent(); |
162 |
|
|
163 |
|
synchronized (portletContent) |
164 |
|
{ |
165 |
|
if (!portletContent.isComplete()) |
166 |
|
{ |
167 |
|
portletContent.wait(); |
168 |
|
} |
169 |
|
} |
170 |
|
} |
171 |
|
} |
172 |
|
catch (Exception e) |
173 |
|
{ |
174 |
|
log.error("Exception during synchronizing all portlet rendering jobs.", e); |
175 |
|
} |
176 |
|
} |
177 |
|
else |
178 |
|
{ |
179 |
|
|
180 |
|
|
181 |
|
|
182 |
|
try |
183 |
|
{ |
184 |
|
if (!renderingJobs.isEmpty()) |
185 |
|
{ |
186 |
|
Object lock = new Object(); |
187 |
|
MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(lock, renderingJobs); |
188 |
|
|
189 |
|
synchronized (lock) |
190 |
|
{ |
191 |
|
WorkItem monitorWorkItem = this.workManager.schedule(monitoringWork, class="keyword">this); |
192 |
|
lock.wait(); |
193 |
|
} |
194 |
|
} |
195 |
|
} |
196 |
|
catch (Exception e) |
197 |
|
{ |
198 |
|
log.error("Exception during synchronizing all portlet rendering jobs.", e); |
199 |
|
} |
200 |
|
} |
201 |
|
} |
202 |
|
|
203 |
|
|
204 |
|
|
205 |
|
|
206 |
|
|
207 |
|
public int getAvailableJobsCount() |
208 |
|
{ |
209 |
|
return 0; |
210 |
|
} |
211 |
|
|
212 |
|
public int getRunningJobsCount() |
213 |
|
{ |
214 |
|
return 0; |
215 |
|
} |
216 |
|
|
217 |
|
|
218 |
|
|
219 |
|
public void workAccepted(WorkEvent we) |
220 |
|
{ |
221 |
|
WorkItem workItem = we.getWorkItem(); |
222 |
|
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workAccepted: " + workItem); |
223 |
|
} |
224 |
|
|
225 |
|
public void workRejected(WorkEvent we) |
226 |
|
{ |
227 |
|
WorkItem workItem = we.getWorkItem(); |
228 |
|
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workRejected: " + workItem); |
229 |
|
|
230 |
|
if (this.jobWorksMonitorEnabled) |
231 |
|
{ |
232 |
|
removeMonitoredJobWork(workItem); |
233 |
|
} |
234 |
|
} |
235 |
|
|
236 |
|
public void workStarted(WorkEvent we) |
237 |
|
{ |
238 |
|
WorkItem workItem = we.getWorkItem(); |
239 |
|
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workStarted: " + workItem); |
240 |
|
} |
241 |
|
|
242 |
|
public void workCompleted(WorkEvent we) |
243 |
|
{ |
244 |
|
WorkItem workItem = we.getWorkItem(); |
245 |
|
if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workCompleted: " + workItem); |
246 |
|
|
247 |
|
if (this.jobWorksMonitorEnabled) |
248 |
|
{ |
249 |
|
removeMonitoredJobWork(workItem); |
250 |
|
} |
251 |
|
} |
252 |
|
|
253 |
|
protected Object removeMonitoredJobWork(WorkItem workItem) |
254 |
|
{ |
255 |
|
return this.jobWorksMonitored.remove(workItem); |
256 |
|
} |
257 |
|
|
258 |
|
class RenderingJobCommonjWork implements Work |
259 |
|
{ |
260 |
|
|
261 |
|
protected RenderingJob job; |
262 |
|
|
263 |
|
public RenderingJobCommonjWork(RenderingJob job) |
264 |
|
{ |
265 |
|
this.job = job; |
266 |
|
} |
267 |
|
|
268 |
|
public boolean isDaemon() |
269 |
|
{ |
270 |
|
return false; |
271 |
|
} |
272 |
|
|
273 |
|
public void run() |
274 |
|
{ |
275 |
|
if (jobWorksMonitorEnabled || interruptOnTimeout) |
276 |
|
{ |
277 |
|
this.job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread.currentThread()); |
278 |
|
} |
279 |
|
|
280 |
|
this.job.run(); |
281 |
|
} |
282 |
|
|
283 |
|
public void release() |
284 |
|
{ |
285 |
|
} |
286 |
|
|
287 |
|
public RenderingJob getRenderingJob() |
288 |
|
{ |
289 |
|
return this.job; |
290 |
|
} |
291 |
|
} |
292 |
|
|
293 |
|
class MonitoringJobCommonjWork implements Work |
294 |
|
{ |
295 |
|
|
296 |
|
protected Object lock; |
297 |
|
protected List renderingJobs; |
298 |
|
|
299 |
|
public MonitoringJobCommonjWork(Object lock, List jobs) |
300 |
0 |
{ |
301 |
0 |
this.lock = lock; |
302 |
0 |
this.renderingJobs = new ArrayList(jobs); |
303 |
0 |
} |
304 |
|
|
305 |
|
public boolean isDaemon() |
306 |
|
{ |
307 |
0 |
return false; |
308 |
|
} |
309 |
|
|
310 |
|
public void run() |
311 |
|
{ |
312 |
|
try |
313 |
|
{ |
314 |
0 |
while (!this.renderingJobs.isEmpty()) |
315 |
|
{ |
316 |
0 |
for (Iterator it = this.renderingJobs.iterator(); it.hasNext(); ) |
317 |
|
{ |
318 |
0 |
RenderingJob job = (RenderingJob) it.next(); |
319 |
0 |
WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR); |
320 |
0 |
int status = WorkEvent.WORK_ACCEPTED; |
321 |
|
|
322 |
0 |
if (workItem != null) |
323 |
|
{ |
324 |
0 |
status = workItem.getStatus(); |
325 |
|
} |
326 |
|
|
327 |
0 |
boolean isTimeout = job.isTimeout(); |
328 |
|
|
329 |
0 |
if (isTimeout) |
330 |
|
{ |
331 |
0 |
PortletContent content = job.getPortletContent(); |
332 |
|
|
333 |
0 |
if (interruptOnTimeout) |
334 |
|
{ |
335 |
0 |
Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR); |
336 |
|
|
337 |
0 |
if (worker != null) |
338 |
|
{ |
339 |
0 |
synchronized (content) |
340 |
|
{ |
341 |
0 |
if (!content.isComplete()) { |
342 |
0 |
worker.interrupt(); |
343 |
0 |
content.wait(); |
344 |
|
} |
345 |
0 |
} |
346 |
|
} |
347 |
0 |
} |
348 |
|
else |
349 |
|
{ |
350 |
0 |
synchronized (content) |
351 |
|
{ |
352 |
0 |
content.completeWithError(); |
353 |
0 |
} |
354 |
|
} |
355 |
|
} |
356 |
|
|
357 |
0 |
if (status == WorkEvent.WORK_COMPLETED || status == WorkEvent.WORK_REJECTED || isTimeout) |
358 |
|
{ |
359 |
0 |
it.remove(); |
360 |
|
} |
361 |
0 |
} |
362 |
|
|
363 |
0 |
if (!this.renderingJobs.isEmpty()) |
364 |
|
{ |
365 |
0 |
synchronized (this) |
366 |
|
{ |
367 |
0 |
wait(100); |
368 |
0 |
} |
369 |
|
} |
370 |
|
} |
371 |
|
|
372 |
0 |
synchronized (this.lock) |
373 |
|
{ |
374 |
0 |
this.lock.notify(); |
375 |
0 |
} |
376 |
|
} |
377 |
0 |
catch (Exception e) |
378 |
|
{ |
379 |
0 |
log.error("Exceptiong during job timeout monitoring.", e); |
380 |
0 |
} |
381 |
0 |
} |
382 |
|
|
383 |
|
public void release() |
384 |
|
{ |
385 |
0 |
} |
386 |
|
|
387 |
|
} |
388 |
|
|
389 |
|
class CommonjWorkerRenderingJobTimeoutMonitor extends Thread { |
390 |
|
|
391 |
|
long interval = 1000; |
392 |
|
boolean shouldRun = true; |
393 |
|
|
394 |
|
CommonjWorkerRenderingJobTimeoutMonitor(long interval) |
395 |
|
{ |
396 |
|
super("CommonjWorkerRenderingJobTimeoutMonitor"); |
397 |
|
|
398 |
|
if (interval > 0) |
399 |
|
{ |
400 |
|
this.interval = interval; |
401 |
|
} |
402 |
|
} |
403 |
|
|
404 |
|
|
405 |
|
|
406 |
|
|
407 |
|
|
408 |
|
|
409 |
|
public void endThread() |
410 |
|
{ |
411 |
|
shouldRun = false; |
412 |
|
this.interrupt(); |
413 |
|
} |
414 |
|
|
415 |
|
public void run() { |
416 |
|
while (shouldRun) { |
417 |
|
try |
418 |
|
{ |
419 |
|
List timeoutJobWorks = new ArrayList(); |
420 |
|
Collection jobWorks = Arrays.asList(jobWorksMonitored.values().toArray()); |
421 |
|
|
422 |
|
for (Iterator it = jobWorks.iterator(); it.hasNext(); ) |
423 |
|
{ |
424 |
|
RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next(); |
425 |
|
RenderingJob job = jobWork.getRenderingJob(); |
426 |
|
|
427 |
|
if (job.isTimeout()) |
428 |
|
{ |
429 |
|
timeoutJobWorks.add(jobWork); |
430 |
|
} |
431 |
|
} |
432 |
|
|
433 |
|
|
434 |
|
for (Iterator it = timeoutJobWorks.iterator(); it.hasNext(); ) |
435 |
|
{ |
436 |
|
RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next(); |
437 |
|
RenderingJob job = jobWork.getRenderingJob(); |
438 |
|
|
439 |
|
|
440 |
|
if (job.isTimeout()) |
441 |
|
{ |
442 |
|
killJobWork(jobWork); |
443 |
|
} |
444 |
|
} |
445 |
|
} |
446 |
|
catch (Exception e) |
447 |
|
{ |
448 |
|
log.error("Exception during job monitoring.", e); |
449 |
|
} |
450 |
|
|
451 |
|
try |
452 |
|
{ |
453 |
|
synchronized (this) |
454 |
|
{ |
455 |
|
wait(this.interval); |
456 |
|
} |
457 |
|
} |
458 |
|
catch (InterruptedException e) |
459 |
|
{ |
460 |
|
; |
461 |
|
} |
462 |
|
} |
463 |
|
} |
464 |
|
|
465 |
|
public void killJobWork(RenderingJobCommonjWork jobWork) { |
466 |
|
RenderingJob job = jobWork.getRenderingJob(); |
467 |
|
|
468 |
|
try { |
469 |
|
if (log.isWarnEnabled()) { |
470 |
|
PortletWindow window = job.getWindow(); |
471 |
|
ObjectID windowId = (null != window ? window.getId() : class="keyword">null); |
472 |
|
log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout() + "ms): " + windowId); |
473 |
|
} |
474 |
|
|
475 |
|
PortletContent content = job.getPortletContent(); |
476 |
|
Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR); |
477 |
|
|
478 |
|
if (worker != null) |
479 |
|
{ |
480 |
|
synchronized (content) |
481 |
|
{ |
482 |
|
if (!content.isComplete()) { |
483 |
|
worker.interrupt(); |
484 |
|
content.wait(); |
485 |
|
} |
486 |
|
} |
487 |
|
} |
488 |
|
} catch (Exception e) { |
489 |
|
log.error("Exceptiong during job killing.", e); |
490 |
|
} finally { |
491 |
|
WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR); |
492 |
|
|
493 |
|
if (workItem != null) |
494 |
|
{ |
495 |
|
removeMonitoredJobWork(workItem); |
496 |
|
} |
497 |
|
} |
498 |
|
} |
499 |
|
|
500 |
|
} |
501 |
|
|
502 |
|
} |