1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.util;
20
21 import java.util.HashSet;
22 import java.util.IdentityHashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.Set;
26
27 import org.apache.mina.common.Session;
28
29 /***
30 * A base implementation of Thread-pooling filters.
31 * This filter forwards events to its thread pool. This is an implementation of
32 * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33 * thread pool</a> by Douglas C. Schmidt et al.
34 *
35 * @author Trustin Lee (trustin@apache.org)
36 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
37 */
38 public abstract class BaseThreadPool implements ThreadPool
39 {
40 /***
41 * Default maximum size of thread pool (2G).
42 */
43 public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44
45 /***
46 * Default keep-alive time of thread pool (1 min).
47 */
48 public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49
50 private static volatile int threadId = 0;
51
52 private final Map buffers = new IdentityHashMap();
53
54 private final Stack followers = new Stack();
55
56 private final BlockingSet readySessionBuffers = new BlockingSet();
57
58 private final Set busySessionBuffers = new HashSet();
59
60 private Worker leader;
61
62 private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
63
64 private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
65
66 private boolean started;
67
68 private boolean shuttingDown;
69
70 private int poolSize;
71
72 private final Object poolSizeLock = new Object();
73
74 /***
75 * Creates a new instance with default thread pool settings.
76 * You'll have to invoke {@link #start()} method to start threads actually.
77 */
78 protected BaseThreadPool()
79 {
80 }
81
82 public int getPoolSize()
83 {
84 synchronized( poolSizeLock )
85 {
86 return poolSize;
87 }
88 }
89
90 public int getMaximumPoolSize()
91 {
92 return maximumPoolSize;
93 }
94
95 public int getKeepAliveTime()
96 {
97 return keepAliveTime;
98 }
99
100 public void setMaximumPoolSize( int maximumPoolSize )
101 {
102 if( maximumPoolSize <= 0 )
103 throw new IllegalArgumentException();
104 this.maximumPoolSize = maximumPoolSize;
105 }
106
107 public void setKeepAliveTime( int keepAliveTime )
108 {
109 this.keepAliveTime = keepAliveTime;
110 }
111
112 public synchronized void start()
113 {
114 if( started )
115 return;
116
117 shuttingDown = false;
118
119 leader = new Worker();
120 leader.start();
121 leader.lead();
122
123 started = true;
124 }
125
126 public synchronized void stop()
127 {
128 if( !started )
129 return;
130
131 shuttingDown = true;
132 Worker lastLeader = null;
133 for( ;; )
134 {
135 Worker leader = this.leader;
136 if( lastLeader == leader )
137 break;
138
139 while( leader.isAlive() )
140 {
141 leader.interrupt();
142 try
143 {
144 leader.join();
145 }
146 catch( InterruptedException e )
147 {
148 }
149 }
150
151 lastLeader = leader;
152 }
153
154 started = false;
155 }
156
157 private void increasePoolSize()
158 {
159 synchronized( poolSizeLock )
160 {
161 poolSize++;
162 }
163 }
164
165 private void decreasePoolSize()
166 {
167 synchronized( poolSizeLock )
168 {
169 poolSize--;
170 }
171 }
172
173 protected void fireEvent( Object nextFilter, Session session,
174 EventType type, Object data )
175 {
176 final BlockingSet readySessionBuffers = this.readySessionBuffers;
177 final Set busySessionBuffers = this.busySessionBuffers;
178 final Event event = new Event( type, nextFilter, data );
179
180 synchronized( readySessionBuffers )
181 {
182 final SessionBuffer buf = getSessionBuffer( session );
183 final Queue eventQueue = buf.eventQueue;
184
185 synchronized( buf )
186 {
187 eventQueue.push( event );
188 }
189
190 if( !busySessionBuffers.contains( buf ) )
191 {
192 busySessionBuffers.add( buf );
193 readySessionBuffers.add( buf );
194 }
195 }
196 }
197
198 /***
199 * Implement this method to forward events to <tt>nextFilter</tt>.
200 */
201 protected abstract void processEvent( Object nextFilter, Session session,
202 EventType type, Object data );
203
204 private SessionBuffer getSessionBuffer( Session session )
205 {
206 final Map buffers = this.buffers;
207 SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
208 if( buf == null )
209 {
210 synchronized( buffers )
211 {
212 buf = ( SessionBuffer ) buffers.get( session );
213 if( buf == null )
214 {
215 buf = new SessionBuffer( session );
216 buffers.put( session, buf );
217 }
218 }
219 }
220 return buf;
221 }
222
223 private void removeSessionBuffer( SessionBuffer buf )
224 {
225 final Map buffers = this.buffers;
226 final Session session = buf.session;
227 synchronized( buffers )
228 {
229 buffers.remove( session );
230 }
231 }
232
233 private static class SessionBuffer
234 {
235 private final Session session;
236
237 private final Queue eventQueue = new Queue();
238
239 private SessionBuffer( Session session )
240 {
241 this.session = session;
242 }
243 }
244
245 private class Worker extends Thread
246 {
247 private final Object promotionLock = new Object();
248
249 private Worker()
250 {
251 super( "IoThreadPool-" + ( threadId++ ) );
252 increasePoolSize();
253 }
254
255 public void lead()
256 {
257 final Object promotionLock = this.promotionLock;
258 synchronized( promotionLock )
259 {
260 leader = this;
261 promotionLock.notify();
262 }
263 }
264
265 public void run()
266 {
267 for( ;; )
268 {
269 if( !waitForPromotion() )
270 break;
271
272 SessionBuffer buf = fetchBuffer();
273 giveUpLead();
274
275 if( buf == null )
276 {
277 break;
278 }
279
280 processEvents( buf );
281 follow();
282 releaseBuffer( buf );
283 }
284
285 decreasePoolSize();
286 }
287
288 private SessionBuffer fetchBuffer()
289 {
290 SessionBuffer buf = null;
291 BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
292 synchronized( readySessionBuffers )
293 {
294 do
295 {
296 buf = null;
297 try
298 {
299 readySessionBuffers.waitForNewItem();
300 }
301 catch( InterruptedException e )
302 {
303 break;
304 }
305
306 Iterator it = readySessionBuffers.iterator();
307 if( !it.hasNext() )
308 {
309
310 break;
311 }
312
313 do
314 {
315 buf = null;
316 buf = ( SessionBuffer ) it.next();
317 it.remove();
318 }
319 while( buf != null && buf.eventQueue.isEmpty()
320 && it.hasNext() );
321 }
322 while( buf != null && buf.eventQueue.isEmpty() );
323 }
324
325 return buf;
326 }
327
328 private void processEvents( SessionBuffer buf )
329 {
330 final Session session = buf.session;
331 final Queue eventQueue = buf.eventQueue;
332 for( ;; )
333 {
334 Event event;
335 synchronized( buf )
336 {
337 event = ( Event ) eventQueue.pop();
338 if( event == null )
339 break;
340 }
341 processEvent( event.getNextFilter(), session,
342 event.getType(), event.getData() );
343 }
344 }
345
346 private void follow()
347 {
348 final Object promotionLock = this.promotionLock;
349 final Stack followers = BaseThreadPool.this.followers;
350 synchronized( promotionLock )
351 {
352 if( this != leader )
353 {
354 synchronized( followers )
355 {
356 followers.push( this );
357 }
358 }
359 }
360 }
361
362 private void releaseBuffer( SessionBuffer buf )
363 {
364 final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
365 final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
366 final Queue eventQueue = buf.eventQueue;
367
368 synchronized( readySessionBuffers )
369 {
370 if( eventQueue.isEmpty() )
371 {
372 busySessionBuffers.remove( buf );
373 removeSessionBuffer( buf );
374 }
375 else
376 {
377 readySessionBuffers.add( buf );
378 }
379 }
380 }
381
382 private boolean waitForPromotion()
383 {
384 final Object promotionLock = this.promotionLock;
385
386 synchronized( promotionLock )
387 {
388 if( this != leader )
389 {
390 try
391 {
392 int keepAliveTime = getKeepAliveTime();
393 if( keepAliveTime > 0 )
394 {
395 promotionLock.wait( keepAliveTime );
396 }
397 else
398 {
399 promotionLock.wait();
400 }
401 }
402 catch( InterruptedException e )
403 {
404 }
405 }
406
407 boolean timeToLead = this == leader;
408
409 if( !timeToLead )
410 {
411
412 synchronized( followers )
413 {
414 followers.remove( this );
415 }
416 }
417
418 return timeToLead;
419 }
420 }
421
422 private void giveUpLead()
423 {
424 final Stack followers = BaseThreadPool.this.followers;
425 Worker worker;
426 synchronized( followers )
427 {
428 worker = ( Worker ) followers.pop();
429 }
430
431 if( worker != null )
432 {
433 worker.lead();
434 }
435 else
436 {
437 if( !shuttingDown )
438 {
439 synchronized( BaseThreadPool.this )
440 {
441 if( !shuttingDown
442 && getPoolSize() < getMaximumPoolSize() )
443 {
444 worker = new Worker();
445 worker.start();
446 worker.lead();
447 }
448 }
449 }
450 }
451 }
452 }
453 }