1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.jetspeed.services.threadpool;
18
19
20 import java.util.*;
21 import javax.servlet.ServletConfig;
22
23
24 import org.apache.turbine.services.TurbineBaseService;
25
26
27 import org.apache.jetspeed.services.logging.JetspeedLogFactoryService;
28 import org.apache.jetspeed.services.logging.JetspeedLogger;
29
30 /***
31 * This is a Service that provides a simple threadpool usable by all
32 * thread intensive classes in order to optimize resources utilization
33 * screen:<br>
34 *
35 * <p>It uses 3 parameters for contolling resource usage:
36 * <dl>
37 * <dt>init.count</dt>
38 * <dd>The number of threads to start at initizaliation</dd>
39 * <dt>max.count</dt>
40 * <dd>The maximum number of threads started by this service</dd>
41 * <dt>minspare.count</dt>
42 * <dd>The pool tries to keep lways this minimum number if threads
43 * available</dd>
44 * </dl>
45 * </p>
46 *
47 * @author <a href="mailto:burton@apache.org">Kevin A. Burton</a>
48 * @author <a href="mailto:raphael@apache.org">Raphaël Luta</a>
49 * @author <a href="mailto:sgala@hisitech.com">Santiago Gala</a>
50 * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
51 */
52 public class JetspeedThreadPoolService
53 extends TurbineBaseService
54 implements ThreadPoolService
55 {
56 /***
57 * Static initialization of the logger for this class
58 */
59 protected static final JetspeedLogger logger = JetspeedLogFactoryService.getLogger(JetspeedThreadPoolService.class.getName());
60
61 /***
62 * The number of threads to create on initialization
63 */
64 private int initThreads = 50;
65
66 /***
67 * The maximum number of threads that should ever be created.
68 */
69 private int maxThreads = 100;
70
71 /***
72 * The minimum amount of threads that should always be available
73 */
74 private int minSpareThreads = 15;
75
76 /***
77 * The default priority to use when creating new threads.
78 */
79 public static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY;
80
81 /***
82 * Stores threads that are available within the pool.
83 */
84 private Vector availableThreads = new Vector();
85
86
87 /***
88 * The thread group used for all created threads.
89 */
90 private ThreadGroup tg = new ThreadGroup("JetspeedThreadPoolService");
91
92 /***
93 * Create a new queue for adding Runnable objects to.
94 */
95 private Queue queue = new Queue();
96
97 /***
98 * Holds the total number of threads that have ever been processed.
99 */
100 private int count = 0;
101
102
103 /***
104 * Constructor.
105 *
106 * @exception Exception, a generic exception.
107 */
108 public JetspeedThreadPoolService()
109 throws Exception
110 {
111 }
112
113
114 /***
115 * Late init. Don't return control until early init says we're done.
116 */
117 public void init( )
118 {
119 while( !getInit() ) {
120 try {
121 Thread.sleep(500);
122 } catch (InterruptedException ie ) {
123 logger.info("ThreadPool service: Waiting for init()..." );
124 }
125 }
126 }
127
128 /***
129 * Called during Turbine.init()
130 *
131 * @param config A ServletConfig.
132 */
133 public synchronized void init( ServletConfig config )
134 {
135 if( getInit() ) {
136
137 return;
138 }
139
140 try
141 {
142 logger.info ( "JetspeedThreadPoolService early init()....starting!");
143 initThreadpool(config);
144 setInit(true);
145 logger.info ( "JetspeedThreadPoolService early init()....finished!");
146 }
147 catch (Exception e)
148 {
149 logger.error ( "Cannot initialize JetspeedThreadPoolService!", e );
150 }
151
152
153 }
154
155 /***
156 * Processes the Runnable object with an available thread at default priority
157 *
158 * @see #process( Runnable, int )
159 * @param runnable the runnable code to process
160 */
161 public void process( Runnable runnable ) {
162
163 process( runnable, Thread.MIN_PRIORITY );
164
165 }
166
167 /***
168 * Process a Runnable object by allocating a Thread for it
169 * at the given priority
170 *
171 * @param runnable the runnable code to process
172 * @param priority the priority used be the thread that will run this runnable
173 */
174 public void process( Runnable runnable, int priority ) {
175
176 RunnableThread thread = this.getAvailableThread();
177
178 if ( thread == null ) {
179
180 this.getQueue().add( runnable );
181
182 } else {
183
184 try {
185 synchronized ( thread ) {
186
187 int defaultPriority = thread.getPriority();
188 if( defaultPriority != priority ) {
189
190
191 thread.setPriority( priority );
192 }
193 thread.setRunnable( runnable );
194 thread.notify();
195 }
196 } catch ( Throwable t ) {
197 logger.error("Throwable", t);
198 }
199
200 }
201
202
203 }
204
205 /***
206 * Get the number of threads that have been created
207 *
208 * @return the number of threads currently created by the pool
209 */
210 public int getThreadCount() {
211 return this.tg.activeCount();
212 }
213
214 /***
215 * Get the number of threads that are available.
216 *
217 * @return the number of threads available in the pool
218 */
219 public int getAvailableThreadCount() {
220 return this.availableThreads.size();
221 }
222
223 /***
224 * Get the current length of the Runnable queue, waiting for processing
225 *
226 * @return the length of the queue of waiting processes
227 */
228 public int getQueueLength() {
229 return this.getQueue().size();
230 }
231
232 /***
233 * Get the number of threads that have successfully been processed
234 * for logging and debugging purposes.
235 *
236 * @return the number of processes executed since initialization
237 */
238 public int getThreadProcessedCount() {
239 return this.count;
240 }
241
242 /***
243 * Get the queue used by the JetspeedThreadPoolService
244 *
245 * @return the queue holding the waiting processes
246 */
247 Queue getQueue() {
248 return this.queue;
249 }
250
251 /***
252 * Place this thread back into the pool so that it can be used again
253 *
254 * @param thread the thread to release back to the pool
255 */
256 void release( RunnableThread thread ) {
257
258 synchronized ( this.availableThreads ) {
259
260 this.availableThreads.addElement( thread );
261
262 ++this.count;
263
264
265
266
267
268
269 synchronized( this.getQueue() ) {
270
271
272
273 if ( this.getQueue().size() > 0 ) {
274
275 Runnable r = this.getQueue().get();
276
277 if ( r != null ) {
278 this.process( r );
279 } else {
280 logger.info( "JetspeedThreadPoolService: no Runnable found." );
281 }
282
283 }
284
285 }
286
287 }
288
289 }
290
291 /***
292 * This method initialized the ThreadPool
293 *
294 * @param config A ServletConfig.
295 */
296 private void initThreadpool( ServletConfig config )
297 {
298 Properties props = getProperties();
299
300 try {
301
302 this.initThreads = Integer.parseInt( props.getProperty( "init.count" ) );
303 this.maxThreads = Integer.parseInt( props.getProperty( "max.count" ) );
304 this.minSpareThreads = Integer.parseInt( props.getProperty( "minspare.count" ) );
305
306 } catch ( NumberFormatException e ) {
307 logger.error("Invalid number format in properties", e);
308 }
309
310
311 createThreads( this.initThreads );
312
313 }
314
315 /***
316 * Create "count" number of threads and make them available.
317 *
318 * @param count the number of threads to create
319 */
320 private synchronized void createThreads( int count ) {
321
322
323
324
325 if ( this.getThreadCount() < this.maxThreads &&
326 this.getThreadCount() + count > this.maxThreads ) {
327
328 count = this.maxThreads - this.getThreadCount();
329
330 } else if ( this.getThreadCount() >= this.maxThreads ) {
331
332 return;
333 }
334
335 logger.info( "JetspeedThreadPoolService: creating " +
336 count +
337 " more thread(s) for a total of: " +
338 ( this.getThreadCount() + count ) );
339
340 for (int i = 0; i < count; ++i ) {
341
342
343
344 RunnableThread thread = new RunnableThread( this.tg);
345 thread.setPriority( DEFAULT_THREAD_PRIORITY );
346
347 thread.start();
348
349
350 }
351
352 }
353
354 /***
355 * Get a thread that is available from the pool or null if there are no more
356 * threads left.
357 *
358 * @return a thread from the pool or null if non available
359 */
360 private RunnableThread getAvailableThread() {
361
362
363 synchronized( this.availableThreads ) {
364
365
366
367
368 if ( this.getAvailableThreadCount() < this.minSpareThreads ) {
369 this.createThreads( this.minSpareThreads );
370 }
371
372
373 if ( this.getAvailableThreadCount() == 0 ) {
374 return null;
375 }
376
377 RunnableThread thread = null;
378
379
380
381
382 int id = this.availableThreads.size() - 1;
383
384
385
386 thread = (RunnableThread)this.availableThreads.elementAt( id );
387 this.availableThreads.removeElementAt( id );
388
389 return thread;
390 }
391
392
393 }
394
395 }
396
397 /***
398 * Handles holding Runnables until they are ready to be processed. This is an impl
399 * of a FIFO (First In First Out) Queue. This makes it possible to add Runnable
400 * objects so that they get processed and they pass through the queue in a predictable
401 * fashion.
402 *
403 * @author <a href="mailto:burton@apache.org">Kevin A. Burton</a>
404 * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
405 */
406 class Queue {
407
408 /***
409 * Holds Runnables that have been requested to process but there are no
410 * threads available.
411 */
412 private Vector queue = new Vector();
413
414 /***
415 * Add a Runnable object into the queue.
416 *
417 * @param runnable the process to add to the queue
418 */
419 public synchronized void add( Runnable runnable ) {
420 queue.insertElementAt( runnable, 0 );
421 }
422
423 /***
424 * Get a Runnable object from the queue, and then remove it. Return null
425 * if no more Runnable objects exist.
426 *
427 * @return the first Runnable stored in the queue or null if empty
428 */
429 public synchronized Runnable get() {
430
431 if ( this.queue.size() == 0 ) {
432 JetspeedThreadPoolService.logger.info( "JetspeedThreadPoolService->Queue: No more Runnables left in queue. Returning null" );
433 return null;
434 }
435
436 int id = queue.size() - 1;
437 Runnable runnable = (Runnable)queue.elementAt( id );
438 this.queue.removeElementAt( id );
439
440 return runnable;
441 }
442
443 /***
444 * Return the size of the queue.
445 *
446 * @return the size of the queue
447 */
448 public int size() {
449 return this.queue.size();
450 }
451
452 }