View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.nio.channels.spi.SelectorProvider;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Semaphore;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.mina.core.RuntimeIoException;
41  import org.apache.mina.core.filterchain.IoFilter;
42  import org.apache.mina.core.service.AbstractIoAcceptor;
43  import org.apache.mina.core.service.AbstractIoService;
44  import org.apache.mina.core.service.IoAcceptor;
45  import org.apache.mina.core.service.IoHandler;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.SimpleIoProcessorPool;
48  import org.apache.mina.core.session.AbstractIoSession;
49  import org.apache.mina.core.session.IoSession;
50  import org.apache.mina.core.session.IoSessionConfig;
51  import org.apache.mina.transport.socket.SocketSessionConfig;
52  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53  import org.apache.mina.util.ExceptionMonitor;
54  
55  /**
56   * A base class for implementing transport using a polling strategy. The
57   * underlying sockets will be checked in an active loop and woke up when an
58   * socket needed to be processed. This class handle the logic behind binding,
59   * accepting and disposing the server sockets. An {@link Executor} will be used
60   * for running client accepting and an {@link AbstractPollingIoProcessor} will
61   * be used for processing client I/O operations like reading, writing and
62   * closing.
63   * 
64   * All the low level methods for binding, accepting, closing need to be provided
65   * by the subclassing implementation.
66   * 
67   * @see NioSocketAcceptor for a example of implementation
68   * 
69   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
70   */
71  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
72      /** A lock used to protect the selector to be waked up before it's created */
73      private final Semaphore lock = new Semaphore(1);
74  
75      private final IoProcessor<S> processor;
76  
77      private final boolean createdProcessor;
78  
79      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80  
81      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
82  
83      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
84  
85      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
86  
87      /** A flag set when the acceptor has been created and initialized */
88      private volatile boolean selectable;
89  
90      /** The thread responsible of accepting incoming requests */
91      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
92  
93      protected boolean reuseAddress = false;
94  
95      /**
96       * Define the number of socket that can wait to be accepted. Default
97       * to 50 (as in the SocketServer default).
98       */
99      protected int backlog = 50;
100 
101     /**
102      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
103      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
104      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
105      * pool size will be used.
106      * 
107      * @see SimpleIoProcessorPool
108      * 
109      * @param sessionConfig
110      *            the default configuration for the managed {@link IoSession}
111      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
112      *            type.
113      */
114     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116     }
117 
118     /**
119      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
120      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
121      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
122      * systems.
123      * 
124      * @see SimpleIoProcessorPool
125      * 
126      * @param sessionConfig
127      *            the default configuration for the managed {@link IoSession}
128      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
129      *            type.
130      * @param processorCount the amount of processor to instantiate for the pool
131      */
132     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133             int processorCount) {
134         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135     }
136 
137     /**
138      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
139      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
140      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
141      * systems.
142      *
143      * @see SimpleIoProcessorPool
144      *
145      * @param sessionConfig
146      *            the default configuration for the managed {@link IoSession}
147      * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
148      *            type.
149      * @param processorCount the amount of processor to instantiate for the pool
150      * @param selectorProvider The SelectorProvider to use
151      */
152     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153             int processorCount, SelectorProvider selectorProvider ) {
154         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155     }
156 
157     /**
158      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
159      * session configuration, a default {@link Executor} will be created using
160      * {@link Executors#newCachedThreadPool()}.
161      * 
162      * @see AbstractIoService
163      * 
164      * @param sessionConfig
165      *            the default configuration for the managed {@link IoSession}
166      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
167      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168      */
169     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170         this(sessionConfig, null, processor, false, null);
171     }
172 
173     /**
174      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
175      * default session configuration and an {@link Executor} for handling I/O
176      * events. If a null {@link Executor} is provided, a default one will be
177      * created using {@link Executors#newCachedThreadPool()}.
178      * 
179      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
180      * 
181      * @param sessionConfig
182      *            the default configuration for the managed {@link IoSession}
183      * @param executor
184      *            the {@link Executor} used for handling asynchronous execution
185      *            of I/O events. Can be <code>null</code>.
186      * @param processor
187      *            the {@link IoProcessor} for processing the {@link IoSession}
188      *            of this transport, triggering events to the bound
189      *            {@link IoHandler} and processing the chains of
190      *            {@link IoFilter}
191      */
192     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193         this(sessionConfig, executor, processor, false, null);
194     }
195 
196     /**
197      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
198      * default session configuration and an {@link Executor} for handling I/O
199      * events. If a null {@link Executor} is provided, a default one will be
200      * created using {@link Executors#newCachedThreadPool()}.
201      * 
202      * @see AbstractIoService(IoSessionConfig, Executor)
203      * 
204      * @param sessionConfig
205      *            the default configuration for the managed {@link IoSession}
206      * @param executor
207      *            the {@link Executor} used for handling asynchronous execution
208      *            of I/O events. Can be <code>null</code>.
209      * @param processor
210      *            the {@link IoProcessor} for processing the {@link IoSession}
211      *            of this transport, triggering events to the bound
212      *            {@link IoHandler} and processing the chains of
213      *            {@link IoFilter}
214      * @param createdProcessor
215      *            tagging the processor as automatically created, so it will be
216      *            automatically disposed
217      */
218     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219             boolean createdProcessor, SelectorProvider selectorProvider) {
220         super(sessionConfig, executor);
221 
222         if (processor == null) {
223             throw new IllegalArgumentException("processor");
224         }
225 
226         this.processor = processor;
227         this.createdProcessor = createdProcessor;
228 
229         try {
230             // Initialize the selector
231             init(selectorProvider);
232 
233             // The selector is now ready, we can switch the
234             // flag to true so that incoming connection can be accepted
235             selectable = true;
236         } catch (RuntimeException e) {
237             throw e;
238         } catch (Exception e) {
239             throw new RuntimeIoException("Failed to initialize.", e);
240         } finally {
241             if (!selectable) {
242                 try {
243                     destroy();
244                 } catch (Exception e) {
245                     ExceptionMonitor.getInstance().exceptionCaught(e);
246                 }
247             }
248         }
249     }
250 
251     /**
252      * Initialize the polling system, will be called at construction time.
253      * @throws Exception any exception thrown by the underlying system calls
254      */
255     protected abstract void init() throws Exception;
256 
257     /**
258      * Initialize the polling system, will be called at construction time.
259      * 
260      * @param selectorProvider The Selector Provider that will be used by this polling acceptor
261      * @throws Exception any exception thrown by the underlying system calls
262      */
263     protected abstract void init(SelectorProvider selectorProvider) throws Exception;
264 
265     /**
266      * Destroy the polling system, will be called when this {@link IoAcceptor}
267      * implementation will be disposed.
268      * @throws Exception any exception thrown by the underlying systems calls
269      */
270     protected abstract void destroy() throws Exception;
271 
272     /**
273      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
274      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
275      * @return The number of sockets having got incoming client
276      * @throws Exception any exception thrown by the underlying systems calls
277      */
278     protected abstract int select() throws Exception;
279 
280     /**
281      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
282      */
283     protected abstract void wakeup();
284 
285     /**
286      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
287      *  during the last {@link #select()} call.
288      * @return the list of server handles ready
289      */
290     protected abstract Iterator<H> selectedHandles();
291 
292     /**
293      * Open a server socket for a given local address.
294      * @param localAddress the associated local address
295      * @return the opened server socket
296      * @throws Exception any exception thrown by the underlying systems calls
297      */
298     protected abstract H open(SocketAddress localAddress) throws Exception;
299 
300     /**
301      * Get the local address associated with a given server socket
302      * @param handle the server socket
303      * @return the local {@link SocketAddress} associated with this handle
304      * @throws Exception any exception thrown by the underlying systems calls
305      */
306     protected abstract SocketAddress localAddress(H handle) throws Exception;
307 
308     /**
309      * Accept a client connection for a server socket and return a new {@link IoSession}
310      * associated with the given {@link IoProcessor}
311      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
312      * @param handle the server handle
313      * @return the created {@link IoSession}
314      * @throws Exception any exception thrown by the underlying systems calls
315      */
316     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
317 
318     /**
319      * Close a server socket.
320      * @param handle the server socket
321      * @throws Exception any exception thrown by the underlying systems calls
322      */
323     protected abstract void close(H handle) throws Exception;
324 
325     /**
326      * {@inheritDoc}
327      */
328     @Override
329     protected void dispose0() throws Exception {
330         unbind();
331 
332         startupAcceptor();
333         wakeup();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     @Override
340     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
341         // Create a bind request as a Future operation. When the selector
342         // have handled the registration, it will signal this future.
343         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
344 
345         // adds the Registration request to the queue for the Workers
346         // to handle
347         registerQueue.add(request);
348 
349         // creates the Acceptor instance and has the local
350         // executor kick it off.
351         startupAcceptor();
352 
353         // As we just started the acceptor, we have to unblock the select()
354         // in order to process the bind request we just have added to the
355         // registerQueue.
356         try {
357             lock.acquire();
358 
359             // Wait a bit to give a chance to the Acceptor thread to do the select()
360             Thread.sleep(10);
361             wakeup();
362         } finally {
363             lock.release();
364         }
365 
366         // Now, we wait until this request is completed.
367         request.awaitUninterruptibly();
368 
369         if (request.getException() != null) {
370             throw request.getException();
371         }
372 
373         // Update the local addresses.
374         // setLocalAddresses() shouldn't be called from the worker thread
375         // because of deadlock.
376         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
377 
378         for (H handle : boundHandles.values()) {
379             newLocalAddresses.add(localAddress(handle));
380         }
381 
382         return newLocalAddresses;
383     }
384 
385     /**
386      * This method is called by the doBind() and doUnbind()
387      * methods.  If the acceptor is null, the acceptor object will
388      * be created and kicked off by the executor.  If the acceptor
389      * object is null, probably already created and this class
390      * is now working, then nothing will happen and the method
391      * will just return.
392      */
393     private void startupAcceptor() throws InterruptedException {
394         // If the acceptor is not ready, clear the queues
395         // TODO : they should already be clean : do we have to do that ?
396         if (!selectable) {
397             registerQueue.clear();
398             cancelQueue.clear();
399         }
400 
401         // start the acceptor if not already started
402         Acceptor acceptor = acceptorRef.get();
403 
404         if (acceptor == null) {
405             lock.acquire();
406             acceptor = new Acceptor();
407 
408             if (acceptorRef.compareAndSet(null, acceptor)) {
409                 executeWorker(acceptor);
410             } else {
411                 lock.release();
412             }
413         }
414     }
415 
416     /**
417      * {@inheritDoc}
418      */
419     @Override
420     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
421         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
422 
423         cancelQueue.add(future);
424         startupAcceptor();
425         wakeup();
426 
427         future.awaitUninterruptibly();
428         if (future.getException() != null) {
429             throw future.getException();
430         }
431     }
432 
433     /**
434      * This class is called by the startupAcceptor() method and is
435      * placed into a NamePreservingRunnable class.
436      * It's a thread accepting incoming connections from clients.
437      * The loop is stopped when all the bound handlers are unbound.
438      */
439     private class Acceptor implements Runnable {
440         public void run() {
441             assert (acceptorRef.get() == this);
442 
443             int nHandles = 0;
444 
445             // Release the lock
446             lock.release();
447 
448             while (selectable) {
449                 try {
450                     // Detect if we have some keys ready to be processed
451                     // The select() will be woke up if some new connection
452                     // have occurred, or if the selector has been explicitly
453                     // woke up
454                     int selected = select();
455 
456                     // this actually sets the selector to OP_ACCEPT,
457                     // and binds to the port on which this class will
458                     // listen on
459                     nHandles += registerHandles();
460 
461                     // Now, if the number of registred handles is 0, we can
462                     // quit the loop: we don't have any socket listening
463                     // for incoming connection.
464                     if (nHandles == 0) {
465                         acceptorRef.set(null);
466 
467                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
468                             assert (acceptorRef.get() != this);
469                             break;
470                         }
471 
472                         if (!acceptorRef.compareAndSet(null, this)) {
473                             assert (acceptorRef.get() != this);
474                             break;
475                         }
476 
477                         assert (acceptorRef.get() == this);
478                     }
479 
480                     if (selected > 0) {
481                         // We have some connection request, let's process
482                         // them here.
483                         processHandles(selectedHandles());
484                     }
485 
486                     // check to see if any cancellation request has been made.
487                     nHandles -= unregisterHandles();
488                 } catch (ClosedSelectorException cse) {
489                     // If the selector has been closed, we can exit the loop
490                     ExceptionMonitor.getInstance().exceptionCaught(cse);
491                     break;
492                 } catch (Exception e) {
493                     ExceptionMonitor.getInstance().exceptionCaught(e);
494 
495                     try {
496                         Thread.sleep(1000);
497                     } catch (InterruptedException e1) {
498                         ExceptionMonitor.getInstance().exceptionCaught(e1);
499                     }
500                 }
501             }
502 
503             // Cleanup all the processors, and shutdown the acceptor.
504             if (selectable && isDisposing()) {
505                 selectable = false;
506                 try {
507                     if (createdProcessor) {
508                         processor.dispose();
509                     }
510                 } finally {
511                     try {
512                         synchronized (disposalLock) {
513                             if (isDisposing()) {
514                                 destroy();
515                             }
516                         }
517                     } catch (Exception e) {
518                         ExceptionMonitor.getInstance().exceptionCaught(e);
519                     } finally {
520                         disposalFuture.setDone();
521                     }
522                 }
523             }
524         }
525 
526         /**
527          * This method will process new sessions for the Worker class.  All
528          * keys that have had their status updates as per the Selector.selectedKeys()
529          * method will be processed here.  Only keys that are ready to accept
530          * connections are handled here.
531          * <p/>
532          * Session objects are created by making new instances of SocketSessionImpl
533          * and passing the session object to the SocketIoProcessor class.
534          */
535         @SuppressWarnings("unchecked")
536         private void processHandles(Iterator<H> handles) throws Exception {
537             while (handles.hasNext()) {
538                 H handle = handles.next();
539                 handles.remove();
540 
541                 // Associates a new created connection to a processor,
542                 // and get back a session
543                 S session = accept(processor, handle);
544 
545                 if (session == null) {
546                     continue;
547                 }
548 
549                 initSession(session, null, null);
550 
551                 // add the session to the SocketIoProcessor
552                 session.getProcessor().add(session);
553             }
554         }
555     }
556 
557     /**
558      * Sets up the socket communications.  Sets items such as:
559      * <p/>
560      * Blocking
561      * Reuse address
562      * Receive buffer size
563      * Bind to listen port
564      * Registers OP_ACCEPT for selector
565      */
566     private int registerHandles() {
567         for (;;) {
568             // The register queue contains the list of services to manage
569             // in this acceptor.
570             AcceptorOperationFuture future = registerQueue.poll();
571 
572             if (future == null) {
573                 return 0;
574             }
575 
576             // We create a temporary map to store the bound handles,
577             // as we may have to remove them all if there is an exception
578             // during the sockets opening.
579             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
580             List<SocketAddress> localAddresses = future.getLocalAddresses();
581 
582             try {
583                 // Process all the addresses
584                 for (SocketAddress a : localAddresses) {
585                     H handle = open(a);
586                     newHandles.put(localAddress(handle), handle);
587                 }
588 
589                 // Everything went ok, we can now update the map storing
590                 // all the bound sockets.
591                 boundHandles.putAll(newHandles);
592 
593                 // and notify.
594                 future.setDone();
595                 return newHandles.size();
596             } catch (Exception e) {
597                 // We store the exception in the future
598                 future.setException(e);
599             } finally {
600                 // Roll back if failed to bind all addresses.
601                 if (future.getException() != null) {
602                     for (H handle : newHandles.values()) {
603                         try {
604                             close(handle);
605                         } catch (Exception e) {
606                             ExceptionMonitor.getInstance().exceptionCaught(e);
607                         }
608                     }
609 
610                     // TODO : add some comment : what is the wakeup() waking up ?
611                     wakeup();
612                 }
613             }
614         }
615     }
616 
617     /**
618      * This method just checks to see if anything has been placed into the
619      * cancellation queue.  The only thing that should be in the cancelQueue
620      * is CancellationRequest objects and the only place this happens is in
621      * the doUnbind() method.
622      */
623     private int unregisterHandles() {
624         int cancelledHandles = 0;
625         for (;;) {
626             AcceptorOperationFuture future = cancelQueue.poll();
627             if (future == null) {
628                 break;
629             }
630 
631             // close the channels
632             for (SocketAddress a : future.getLocalAddresses()) {
633                 H handle = boundHandles.remove(a);
634 
635                 if (handle == null) {
636                     continue;
637                 }
638 
639                 try {
640                     close(handle);
641                     wakeup(); // wake up again to trigger thread death
642                 } catch (Exception e) {
643                     ExceptionMonitor.getInstance().exceptionCaught(e);
644                 } finally {
645                     cancelledHandles++;
646                 }
647             }
648 
649             future.setDone();
650         }
651 
652         return cancelledHandles;
653     }
654 
655     /**
656      * {@inheritDoc}
657      */
658     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
659         throw new UnsupportedOperationException();
660     }
661 
662     /**
663      * @return the backLog
664      */
665     public int getBacklog() {
666         return backlog;
667     }
668 
669     /**
670      * Sets the Backlog parameter
671      * 
672      * @param backlog
673      *            the backlog variable
674      */
675     public void setBacklog(int backlog) {
676         synchronized (bindLock) {
677             if (isActive()) {
678                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
679             }
680 
681             this.backlog = backlog;
682         }
683     }
684 
685     /**
686      * @return the flag that sets the reuseAddress information
687      */
688     public boolean isReuseAddress() {
689         return reuseAddress;
690     }
691 
692     /**
693      * Set the Reuse Address flag
694      * 
695      * @param reuseAddress
696      *            The flag to set
697      */
698     public void setReuseAddress(boolean reuseAddress) {
699         synchronized (bindLock) {
700             if (isActive()) {
701                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
702             }
703 
704             this.reuseAddress = reuseAddress;
705         }
706     }
707 
708     /**
709      * {@inheritDoc}
710      */
711     public SocketSessionConfig getSessionConfig() {
712         return (SocketSessionConfig)sessionConfig;
713     }
714 }