View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.Executors;
34  
35  import org.apache.mina.core.RuntimeIoException;
36  import org.apache.mina.core.filterchain.IoFilter;
37  import org.apache.mina.core.future.IoFuture;
38  import org.apache.mina.core.service.AbstractIoAcceptor;
39  import org.apache.mina.core.service.IoAcceptor;
40  import org.apache.mina.core.service.IoHandler;
41  import org.apache.mina.core.service.IoProcessor;
42  import org.apache.mina.core.service.SimpleIoProcessorPool;
43  import org.apache.mina.core.session.AbstractIoSession;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * accepting and disposing the server sockets. An {@link Executor} will be used
54   * for running client accepting and an {@link AbstractPollingIoProcessor} will
55   * be used for processing client I/O operations like reading, writing and
56   * closing.
57   * 
58   * All the low level methods for binding, accepting, closing need to be provided
59   * by the subclassing implementation.
60   * 
61   * @see NioSocketAcceptor for a example of implementation
62   * 
63   * @author The Apache MINA Project (dev@mina.apache.org)
64   * @version $Rev: 678335 $, $Date: 2008-06-26 17:58:30 +0200 (jeu, 26 jun 2008)
65   *          $
66   */
67  public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
68          extends AbstractIoAcceptor {
69  
70      private final IoProcessor<T> processor;
71  
72      private final boolean createdProcessor;
73  
74      private final Object lock = new Object();
75  
76      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
77  
78      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79  
80      private final Map<SocketAddress, H> boundHandles = Collections
81              .synchronizedMap(new HashMap<SocketAddress, H>());
82  
83      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
84  
85      private volatile boolean selectable;
86  
87      private Worker worker;
88  
89      /**
90       * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
91       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
92       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
93       * pool size will be used.
94       * 
95       * @see SimpleIoProcessorPool
96       * 
97       * @param sessionConfig
98       *            the default configuration for the managed {@link IoSession}
99       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
100      *            type.
101      */
102     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
103             Class<? extends IoProcessor<T>> processorClass) {
104         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
105                 true);
106     }
107 
108     /**
109      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
110      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
111      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
112      * systems.
113      * 
114      * @see SimpleIoProcessorPool
115      * 
116      * @param sessionConfig
117      *            the default configuration for the managed {@link IoSession}
118      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
119      *            type.
120      * @param processorCount the amount of processor to instantiate for the pool
121      */
122     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
123             Class<? extends IoProcessor<T>> processorClass, int processorCount) {
124         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass,
125                 processorCount), true);
126     }
127 
128     /**
129      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
130      * session configuration, a default {@link Executor} will be created using
131      * {@link Executors#newCachedThreadPool()}.
132      * 
133      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
134      * 
135      * @param sessionConfig
136      *            the default configuration for the managed {@link IoSession}
137      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
138      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
139      */
140     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
141             IoProcessor<T> processor) {
142         this(sessionConfig, null, processor, false);
143     }
144 
145     /**
146      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
147      * session configuration and an {@link Executor} for handling I/O events. If
148      * null {@link Executor} is provided, a default one will be created using
149      * {@link Executors#newCachedThreadPool()}.
150      * 
151      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
152      * 
153      * @param sessionConfig
154      *            the default configuration for the managed {@link IoSession}
155      * @param executor
156      *            the {@link Executor} used for handling asynchronous execution of I/O
157      *            events. Can be <code>null</code>.
158      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
159      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
160      */
161     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
162             Executor executor, IoProcessor<T> processor) {
163         this(sessionConfig, executor, processor, false);
164     }
165 
166     /**
167      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
168      * session configuration and an {@link Executor} for handling I/O events. If
169      * null {@link Executor} is provided, a default one will be created using
170      * {@link Executors#newCachedThreadPool()}.
171      * 
172      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
173      * 
174      * @param sessionConfig
175      *            the default configuration for the managed {@link IoSession}
176      * @param executor
177      *            the {@link Executor} used for handling asynchronous execution of I/O
178      *            events. Can be <code>null</code>.
179      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
180      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
181      * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed 
182      */
183     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
184             Executor executor, IoProcessor<T> processor,
185             boolean createdProcessor) {
186         super(sessionConfig, executor);
187 
188         if (processor == null) {
189             throw new NullPointerException("processor");
190         }
191 
192         this.processor = processor;
193         this.createdProcessor = createdProcessor;
194 
195         try {
196             init();
197             selectable = true;
198         } catch (RuntimeException e) {
199             throw e;
200         } catch (Exception e) {
201             throw new RuntimeIoException("Failed to initialize.", e);
202         } finally {
203             if (!selectable) {
204                 try {
205                     destroy();
206                 } catch (Exception e) {
207                     ExceptionMonitor.getInstance().exceptionCaught(e);
208                 }
209             }
210         }
211     }
212 
213     /**
214      * Initialize the polling system, will be called at construction time.
215      * @throws Exception any exception thrown by the underlying system calls  
216      */
217     protected abstract void init() throws Exception;
218 
219     /**
220      * Destroy the polling system, will be called when this {@link IoAcceptor}
221      * implementation will be disposed.  
222      * @throws Exception any exception thrown by the underlying systems calls
223      */
224     protected abstract void destroy() throws Exception;
225 
226     /**
227      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
228      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
229      * @return true if one server socket have got incoming client
230      * @throws Exception any exception thrown by the underlying systems calls
231      */
232     protected abstract boolean select() throws Exception;
233 
234     /**
235      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
236      */
237     protected abstract void wakeup();
238 
239     /**
240      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
241      *  during the last {@link #select()} call.
242      * @return the list of server handles ready
243      */
244     protected abstract Iterator<H> selectedHandles();
245 
246     /**
247      * Open a server socket for a given local address.
248      * @param localAddress the associated local address
249      * @return the opened server socket
250      * @throws Exception any exception thrown by the underlying systems calls
251      */
252     protected abstract H open(SocketAddress localAddress) throws Exception;
253 
254     /**
255      * Get the local address associated with a given server socket
256      * @param handle the server socket
257      * @return the local {@link SocketAddress} associated with this handle
258      * @throws Exception any exception thrown by the underlying systems calls
259      */
260     protected abstract SocketAddress localAddress(H handle) throws Exception;
261 
262     /**
263      * Accept a client connection for a server socket and return a new {@link IoSession}
264      * associated with the given {@link IoProcessor}
265      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}  
266      * @param handle the server handle
267      * @return the created {@link IoSession}
268      * @throws Exception any exception thrown by the underlying systems calls
269      */
270     protected abstract T accept(IoProcessor<T> processor, H handle)
271             throws Exception;
272 
273     /**
274      * Close a server socket.
275      * @param handle the server socket
276      * @throws Exception any exception thrown by the underlying systems calls
277      */
278     protected abstract void close(H handle) throws Exception;
279 
280     /**
281      * {@inheritDoc}
282      */
283     @Override
284     protected IoFuture dispose0() throws Exception {
285         unbind();
286         if (!disposalFuture.isDone()) {
287             startupWorker();
288             wakeup();
289         }
290         return disposalFuture;
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     protected final Set<SocketAddress> bind0(
298             List<? extends SocketAddress> localAddresses) throws Exception {
299         AcceptorOperationFuture request = new AcceptorOperationFuture(
300                 localAddresses);
301 
302         // adds the Registration request to the queue for the Workers
303         // to handle
304         registerQueue.add(request);
305 
306         // creates an instance of a Worker and has the local
307         // executor kick it off.
308         startupWorker();
309         wakeup();
310         request.awaitUninterruptibly();
311 
312         if (request.getException() != null) {
313             throw request.getException();
314         }
315 
316         // Update the local addresses.
317         // setLocalAddresses() shouldn't be called from the worker thread
318         // because of deadlock.
319         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
320         for (H handle : boundHandles.values()) {
321             newLocalAddresses.add(localAddress(handle));
322         }
323 
324         return newLocalAddresses;
325     }
326 
327     /**
328      * This method is called by the doBind() and doUnbind()
329      * methods.  If the worker object is not null, presumably
330      * the acceptor is starting up, then the worker object will
331      * be created and kicked off by the executor.  If the worker
332      * object is not null, probably already created and this class
333      * is now working, then nothing will happen and the method
334      * will just return.
335      */
336     private void startupWorker() {
337         if (!selectable) {
338             registerQueue.clear();
339             cancelQueue.clear();
340         }
341 
342         synchronized (lock) {
343             if (worker == null) {
344                 worker = new Worker();
345                 executeWorker(worker);
346             }
347         }
348     }
349 
350     /**
351      * {@inheritDoc}
352      */
353     @Override
354     protected final void unbind0(List<? extends SocketAddress> localAddresses)
355             throws Exception {
356         AcceptorOperationFuture future = new AcceptorOperationFuture(
357                 localAddresses);
358 
359         cancelQueue.add(future);
360         startupWorker();
361         wakeup();
362 
363         future.awaitUninterruptibly();
364         if (future.getException() != null) {
365             throw future.getException();
366         }
367     }
368 
369     /**
370      * This class is called by the startupWorker() method and is
371      * placed into a NamePreservingRunnable class.
372      */
373     private class Worker implements Runnable {
374         public void run() {
375             int nHandles = 0;
376 
377             while (selectable) {
378                 try {
379                     // Detect if we have some keys ready to be processed
380                     boolean selected = select();
381 
382                     // this actually sets the selector to OP_ACCEPT,
383                     // and binds to the port in which this class will
384                     // listen on
385                     nHandles += registerHandles();
386 
387                     if (selected) {
388                         processHandles(selectedHandles());
389                     }
390 
391                     // check to see if any cancellation request has been made.
392                     nHandles -= unregisterHandles();
393 
394                     if (nHandles == 0) {
395                         synchronized (lock) {
396                             if (registerQueue.isEmpty()
397                                     && cancelQueue.isEmpty()) {
398                                 worker = null;
399                                 break;
400                             }
401                         }
402                     }
403                 } catch (Throwable e) {
404                     ExceptionMonitor.getInstance().exceptionCaught(e);
405 
406                     try {
407                         Thread.sleep(1000);
408                     } catch (InterruptedException e1) {
409                         ExceptionMonitor.getInstance().exceptionCaught(e1);
410                     }
411                 }
412             }
413 
414             if (selectable && isDisposing()) {
415                 selectable = false;
416                 try {
417                     if (createdProcessor) {
418                         processor.dispose();
419                     }
420                 } finally {
421                     try {
422                         synchronized (disposalLock) {
423                             if (isDisposing()) {
424                                 destroy();
425                             }
426                         }
427                     } catch (Exception e) {
428                         ExceptionMonitor.getInstance().exceptionCaught(e);
429                     } finally {
430                         disposalFuture.setDone();
431                     }
432                 }
433             }
434         }
435 
436         /**
437          * This method will process new sessions for the Worker class.  All
438          * keys that have had their status updates as per the Selector.selectedKeys()
439          * method will be processed here.  Only keys that are ready to accept
440          * connections are handled here.
441          * <p/>
442          * Session objects are created by making new instances of SocketSessionImpl
443          * and passing the session object to the SocketIoProcessor class.
444          */
445         @SuppressWarnings("unchecked")
446         private void processHandles(Iterator<H> handles) throws Exception {
447             while (handles.hasNext()) {
448                 H handle = handles.next();
449                 handles.remove();
450 
451                 T session = accept(processor, handle);
452                 if (session == null) {
453                     break;
454                 }
455 
456                 finishSessionInitialization(session, null, null);
457 
458                 // add the session to the SocketIoProcessor
459                 session.getProcessor().add(session);
460             }
461         }
462     }
463 
464     /**
465      * Sets up the socket communications.  Sets items such as:
466      * <p/>
467      * Blocking
468      * Reuse address
469      * Receive buffer size
470      * Bind to listen port
471      * Registers OP_ACCEPT for selector
472      */
473     private int registerHandles() {
474         for (;;) {
475             AcceptorOperationFuture future = registerQueue.poll();
476             if (future == null) {
477                 return 0;
478             }
479 
480             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
481             List<SocketAddress> localAddresses = future.getLocalAddresses();
482 
483             try {
484                 for (SocketAddress a : localAddresses) {
485                     H handle = open(a);
486                     newHandles.put(localAddress(handle), handle);
487                 }
488 
489                 boundHandles.putAll(newHandles);
490 
491                 // and notify.
492                 future.setDone();
493                 return newHandles.size();
494             } catch (Exception e) {
495                 future.setException(e);
496             } finally {
497                 // Roll back if failed to bind all addresses.
498                 if (future.getException() != null) {
499                     for (H handle : newHandles.values()) {
500                         try {
501                             close(handle);
502                         } catch (Exception e) {
503                             ExceptionMonitor.getInstance().exceptionCaught(e);
504                         }
505                     }
506                     wakeup();
507                 }
508             }
509         }
510     }
511 
512     /**
513      * This method just checks to see if anything has been placed into the
514      * cancellation queue.  The only thing that should be in the cancelQueue
515      * is CancellationRequest objects and the only place this happens is in
516      * the doUnbind() method.
517      */
518     private int unregisterHandles() {
519         int cancelledHandles = 0;
520         for (;;) {
521             AcceptorOperationFuture future = cancelQueue.poll();
522             if (future == null) {
523                 break;
524             }
525 
526             // close the channels
527             for (SocketAddress a : future.getLocalAddresses()) {
528                 H handle = boundHandles.remove(a);
529                 if (handle == null) {
530                     continue;
531                 }
532 
533                 try {
534                     close(handle);
535                     wakeup(); // wake up again to trigger thread death
536                 } catch (Throwable e) {
537                     ExceptionMonitor.getInstance().exceptionCaught(e);
538                 } finally {
539                     cancelledHandles++;
540                 }
541             }
542 
543             future.setDone();
544         }
545 
546         return cancelledHandles;
547     }
548 
549     /**
550      * {@inheritDoc}
551      */
552     public final IoSession newSession(SocketAddress remoteAddress,
553             SocketAddress localAddress) {
554         throw new UnsupportedOperationException();
555     }
556 }