View Javadoc

1   /*
2    *   @(#) $Id: BaseThreadPool.java 210062 2005-07-11 03:52:38Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.util;
20  
21  import java.util.HashSet;
22  import java.util.IdentityHashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import org.apache.mina.common.Session;
28  
29  /***
30   * A base implementation of Thread-pooling filters.
31   * This filter forwards events to its thread pool.  This is an implementation of
32   * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
33   * thread pool</a> by Douglas C. Schmidt et al.
34   * 
35   * @author Trustin Lee (trustin@apache.org)
36   * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
37   */
38  public abstract class BaseThreadPool implements ThreadPool
39  {
40      /***
41       * Default maximum size of thread pool (2G).
42       */
43      public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
44  
45      /***
46       * Default keep-alive time of thread pool (1 min).
47       */
48      public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
49  
50      private static volatile int threadId = 0;
51  
52      private final Map buffers = new IdentityHashMap();
53  
54      private final Stack followers = new Stack();
55  
56      private final BlockingSet readySessionBuffers = new BlockingSet();
57  
58      private final Set busySessionBuffers = new HashSet();
59  
60      private Worker leader;
61  
62      private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
63  
64      private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
65  
66      private boolean started;
67  
68      private boolean shuttingDown;
69  
70      private int poolSize;
71  
72      private final Object poolSizeLock = new Object();
73  
74      /***
75       * Creates a new instance with default thread pool settings.
76       * You'll have to invoke {@link #start()} method to start threads actually.
77       */
78      protected BaseThreadPool()
79      {
80      }
81  
82      public int getPoolSize()
83      {
84          synchronized( poolSizeLock )
85          {
86              return poolSize;
87          }
88      }
89  
90      public int getMaximumPoolSize()
91      {
92          return maximumPoolSize;
93      }
94  
95      public int getKeepAliveTime()
96      {
97          return keepAliveTime;
98      }
99  
100     public void setMaximumPoolSize( int maximumPoolSize )
101     {
102         if( maximumPoolSize <= 0 )
103             throw new IllegalArgumentException();
104         this.maximumPoolSize = maximumPoolSize;
105     }
106 
107     public void setKeepAliveTime( int keepAliveTime )
108     {
109         this.keepAliveTime = keepAliveTime;
110     }
111 
112     public synchronized void start()
113     {
114         if( started )
115             return;
116 
117         shuttingDown = false;
118 
119         leader = new Worker();
120         leader.start();
121         leader.lead();
122 
123         started = true;
124     }
125 
126     public synchronized void stop()
127     {
128         if( !started )
129             return;
130 
131         shuttingDown = true;
132         Worker lastLeader = null;
133         for( ;; )
134         {
135             Worker leader = this.leader;
136             if( lastLeader == leader )
137                 break;
138 
139             while( leader.isAlive() )
140             {
141                 leader.interrupt();
142                 try
143                 {
144                     leader.join();
145                 }
146                 catch( InterruptedException e )
147                 {
148                 }
149             }
150 
151             lastLeader = leader;
152         }
153 
154         started = false;
155     }
156 
157     private void increasePoolSize()
158     {
159         synchronized( poolSizeLock )
160         {
161             poolSize++;
162         }
163     }
164 
165     private void decreasePoolSize()
166     {
167         synchronized( poolSizeLock )
168         {
169             poolSize--;
170         }
171     }
172 
173     protected void fireEvent( Object nextFilter, Session session,
174                               EventType type, Object data )
175     {
176         final BlockingSet readySessionBuffers = this.readySessionBuffers;
177         final Set busySessionBuffers = this.busySessionBuffers;
178         final Event event = new Event( type, nextFilter, data );
179 
180         synchronized( readySessionBuffers )
181         {
182             final SessionBuffer buf = getSessionBuffer( session );
183             final Queue eventQueue = buf.eventQueue;
184 
185             synchronized( buf )
186             {
187                 eventQueue.push( event );
188             }
189 
190             if( !busySessionBuffers.contains( buf ) )
191             {
192                 busySessionBuffers.add( buf );
193                 readySessionBuffers.add( buf );
194             }
195         }
196     }
197 
198     /***
199      * Implement this method to forward events to <tt>nextFilter</tt>.
200      */
201     protected abstract void processEvent( Object nextFilter, Session session,
202                                           EventType type, Object data );
203 
204     private SessionBuffer getSessionBuffer( Session session )
205     {
206         final Map buffers = this.buffers;
207         SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
208         if( buf == null )
209         {
210             synchronized( buffers )
211             {
212                 buf = ( SessionBuffer ) buffers.get( session );
213                 if( buf == null )
214                 {
215                     buf = new SessionBuffer( session );
216                     buffers.put( session, buf );
217                 }
218             }
219         }
220         return buf;
221     }
222 
223     private void removeSessionBuffer( SessionBuffer buf )
224     {
225         final Map buffers = this.buffers;
226         final Session session = buf.session;
227         synchronized( buffers )
228         {
229             buffers.remove( session );
230         }
231     }
232 
233     private static class SessionBuffer
234     {
235         private final Session session;
236 
237         private final Queue eventQueue = new Queue();
238 
239         private SessionBuffer( Session session )
240         {
241             this.session = session;
242         }
243     }
244 
245     private class Worker extends Thread
246     {
247         private final Object promotionLock = new Object();
248 
249         private Worker()
250         {
251             super( "IoThreadPool-" + ( threadId++ ) );
252             increasePoolSize();
253         }
254 
255         public void lead()
256         {
257             final Object promotionLock = this.promotionLock;
258             synchronized( promotionLock )
259             {
260                 leader = this;
261                 promotionLock.notify();
262             }
263         }
264 
265         public void run()
266         {
267             for( ;; )
268             {
269                 if( !waitForPromotion() )
270                     break;
271 
272                 SessionBuffer buf = fetchBuffer();
273                 giveUpLead();
274 
275                 if( buf == null )
276                 {
277                     break;
278                 }
279 
280                 processEvents( buf );
281                 follow();
282                 releaseBuffer( buf );
283             }
284 
285             decreasePoolSize();
286         }
287 
288         private SessionBuffer fetchBuffer()
289         {
290             SessionBuffer buf = null;
291             BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
292             synchronized( readySessionBuffers )
293             {
294                 do
295                 {
296                     buf = null;
297                     try
298                     {
299                         readySessionBuffers.waitForNewItem();
300                     }
301                     catch( InterruptedException e )
302                     {
303                         break;
304                     }
305 
306                     Iterator it = readySessionBuffers.iterator();
307                     if( !it.hasNext() )
308                     {
309                         // exceeded keepAliveTime
310                         break;
311                     }
312 
313                     do
314                     {
315                         buf = null;
316                         buf = ( SessionBuffer ) it.next();
317                         it.remove();
318                     }
319                     while( buf != null && buf.eventQueue.isEmpty()
320                            && it.hasNext() );
321                 }
322                 while( buf != null && buf.eventQueue.isEmpty() );
323             }
324 
325             return buf;
326         }
327 
328         private void processEvents( SessionBuffer buf )
329         {
330             final Session session = buf.session;
331             final Queue eventQueue = buf.eventQueue;
332             for( ;; )
333             {
334                 Event event;
335                 synchronized( buf )
336                 {
337                     event = ( Event ) eventQueue.pop();
338                     if( event == null )
339                         break;
340                 }
341                 processEvent( event.getNextFilter(), session,
342                               event.getType(), event.getData() );
343             }
344         }
345 
346         private void follow()
347         {
348             final Object promotionLock = this.promotionLock;
349             final Stack followers = BaseThreadPool.this.followers;
350             synchronized( promotionLock )
351             {
352                 if( this != leader )
353                 {
354                     synchronized( followers )
355                     {
356                         followers.push( this );
357                     }
358                 }
359             }
360         }
361 
362         private void releaseBuffer( SessionBuffer buf )
363         {
364             final BlockingSet readySessionBuffers = BaseThreadPool.this.readySessionBuffers;
365             final Set busySessionBuffers = BaseThreadPool.this.busySessionBuffers;
366             final Queue eventQueue = buf.eventQueue;
367 
368             synchronized( readySessionBuffers )
369             {
370                 if( eventQueue.isEmpty() )
371                 {
372                     busySessionBuffers.remove( buf );
373                     removeSessionBuffer( buf );
374                 }
375                 else
376                 {
377                     readySessionBuffers.add( buf );
378                 }
379             }
380         }
381 
382         private boolean waitForPromotion()
383         {
384             final Object promotionLock = this.promotionLock;
385 
386             synchronized( promotionLock )
387             {
388                 if( this != leader )
389                 {
390                     try
391                     {
392                         int keepAliveTime = getKeepAliveTime();
393                         if( keepAliveTime > 0 )
394                         {
395                             promotionLock.wait( keepAliveTime );
396                         }
397                         else
398                         {
399                             promotionLock.wait();
400                         }
401                     }
402                     catch( InterruptedException e )
403                     {
404                     }
405                 }
406 
407                 boolean timeToLead = this == leader;
408 
409                 if( !timeToLead )
410                 {
411                     // time to die
412                     synchronized( followers )
413                     {
414                         followers.remove( this );
415                     }
416                 }
417 
418                 return timeToLead;
419             }
420         }
421 
422         private void giveUpLead()
423         {
424             final Stack followers = BaseThreadPool.this.followers;
425             Worker worker;
426             synchronized( followers )
427             {
428                 worker = ( Worker ) followers.pop();
429             }
430 
431             if( worker != null )
432             {
433                 worker.lead();
434             }
435             else
436             {
437                 if( !shuttingDown )
438                 {
439                     synchronized( BaseThreadPool.this )
440                     {
441                         if( !shuttingDown
442                             && getPoolSize() < getMaximumPoolSize() )
443                         {
444                             worker = new Worker();
445                             worker.start();
446                             worker.lead();
447                         }
448                     }
449                 }
450             }
451         }
452     }
453 }