View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.jetspeed.aggregator.impl;
19  
20  import java.security.AccessControlContext;
21  import java.security.PrivilegedAction;
22  
23  import javax.security.auth.Subject;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.jetspeed.aggregator.RenderingJob;
28  import org.apache.jetspeed.aggregator.Worker;
29  import org.apache.jetspeed.aggregator.WorkerMonitor;
30  import org.apache.jetspeed.security.JSSubject;
31  
32  /***
33   * Worker thread processes jobs and notify its WorkerMonitor when completed.
34   * When no work is available, the worker simply sets itself in a waiting mode
35   * pending reactivation by the WorkerMonitor
36   *
37   * @author <a href="mailto:raphael@apache.org">Raphael Luta</a>
38   * @author <a>Woonsan Ko</a>
39   * @version $Id: WorkerImpl.java 587064 2007-10-22 11:54:11Z woonsan $
40   */
41  public class WorkerImpl extends Thread implements Worker
42  {
43      /*** Commons logging */
44      protected final static Log log = LogFactory.getLog(WorkerImpl.class);
45  
46      /*** Running status of this worker */
47      private boolean running = true;
48  
49      /*** Counter of consecutive jobs that can be processed before the
50          worker being actually put back on the idle queue */
51      private int jobCount = 0;
52  
53      /*** Job to process */
54      private Runnable job = null;
55  
56      /*** Context to process job within */
57      private AccessControlContext context = null;
58  
59      /*** Monitor for this Worker */
60      private WorkerMonitor monitor = null;
61  
62      public WorkerImpl(WorkerMonitor monitor)
63      {
64          super();
65          this.setMonitor(monitor);
66          this.setDaemon(true);
67      }
68  
69      public WorkerImpl(WorkerMonitor monitor, ThreadGroup tg, String name)
70      {
71          super(tg, name);
72          this.setMonitor(monitor);
73          this.setDaemon(true);
74      }
75  
76      /***
77       * Return the number of jobs processed by this worker since the last time it
78       * has been on the idle queue
79       */
80      public int getJobCount()
81      {
82          return this.jobCount;
83      }
84  
85      /***
86       * Reset the processed job counter
87       */
88      public void resetJobCount()
89      {
90          this.jobCount=0;
91      }
92  
93      /***
94       * Sets the running status of this Worker. If set to false, the Worker will
95       * stop after processing its current job.
96       */
97      public void setRunning(boolean status)
98      {
99          this.running = status;
100     }
101 
102     /***
103      * Sets the moitor of this worker
104      */
105     public void setMonitor(WorkerMonitor monitor)
106     {
107         this.monitor = monitor;
108     }
109 
110     /***
111      * Sets the job to execute in security context
112      */
113     public void setJob(Runnable job, AccessControlContext context)
114     {
115         this.job = job;
116         this.context = context;
117     }
118 
119     /***
120      * Sets the job to execute
121      */
122     public void setJob(Runnable job)
123     {
124         this.job = job;
125         this.context = null;
126     }
127 
128     /***
129      * Retrieves the job to execute
130      */
131     public Runnable getJob()
132     {
133         return this.job;
134     }
135 
136     /***
137      * Process the job assigned, then notify Monitor. If no job available,
138      * go into sleep mode
139      */
140     public void run()
141     {
142         while (running)
143         {
144             // wait for a job to come
145             synchronized (this)
146             {
147                 if (this.job == null)
148                 {
149                     try
150                     {
151                         this.wait();
152                     }
153                     catch (InterruptedException e)
154                     {
155                         // nothing done
156                     }
157                 }
158             }
159 
160             // process it
161             if (this.job != null)
162             {
163                 log.debug("Processing job for window :" + ((RenderingJob)job).getWindow().getId());
164                 Subject subject = null;
165                 if (this.context != null)
166                 {
167                     subject = JSSubject.getSubject(this.context);
168                 }
169                 if (subject != null)
170                 {
171                     JSSubject.doAsPrivileged(subject, new PrivilegedAction()
172                         {
173                             public Object run()
174                             {
175                                 try 
176                                 {
177                                     WorkerImpl.this.job.run();
178                                 }
179                                 catch (Throwable t)
180                                 {                        
181                                     log.error("Thread error", t);
182                                 }
183                                 return null;                    
184                             }
185                         }, this.context);
186                 }
187                 else
188                 {
189                     try
190                     {
191                         this.job.run();
192                     }
193                     catch (Throwable t)
194                     {
195                         log.error("Thread error", t);
196                     }
197                 }
198             }
199 
200             this.jobCount++;
201 
202             // release the worker
203             ((WorkerMonitorImpl) monitor).release(this);
204         }
205     }
206     
207 }