View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.nio.channels.spi.SelectorProvider;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Semaphore;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.mina.core.RuntimeIoException;
41  import org.apache.mina.core.filterchain.IoFilter;
42  import org.apache.mina.core.service.AbstractIoAcceptor;
43  import org.apache.mina.core.service.AbstractIoService;
44  import org.apache.mina.core.service.IoAcceptor;
45  import org.apache.mina.core.service.IoHandler;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.SimpleIoProcessorPool;
48  import org.apache.mina.core.session.AbstractIoSession;
49  import org.apache.mina.core.session.IoSession;
50  import org.apache.mina.core.session.IoSessionConfig;
51  import org.apache.mina.transport.socket.SocketSessionConfig;
52  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53  import org.apache.mina.util.ExceptionMonitor;
54  
55  /**
56   * A base class for implementing transport using a polling strategy. The
57   * underlying sockets will be checked in an active loop and woke up when an
58   * socket needed to be processed. This class handle the logic behind binding,
59   * accepting and disposing the server sockets. An {@link Executor} will be used
60   * for running client accepting and an {@link AbstractPollingIoProcessor} will
61   * be used for processing client I/O operations like reading, writing and
62   * closing.
63   * 
64   * All the low level methods for binding, accepting, closing need to be provided
65   * by the subclassing implementation.
66   * 
67   * @see NioSocketAcceptor for a example of implementation
68   * @param <H> The type of IoHandler
69   * @param <S> The type of IoSession
70   * 
71   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
72   */
73  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
74      /** A lock used to protect the selector to be waked up before it's created */
75      private final Semaphore lock = new Semaphore(1);
76  
77      private final IoProcessor<S> processor;
78  
79      private final boolean createdProcessor;
80  
81      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
82  
83      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
84  
85      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
86  
87      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
88  
89      /** A flag set when the acceptor has been created and initialized */
90      private volatile boolean selectable;
91  
92      /** The thread responsible of accepting incoming requests */
93      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>();
94  
95      protected boolean reuseAddress = false;
96  
97      /**
98       * Define the number of socket that can wait to be accepted. Default
99       * to 50 (as in the SocketServer default).
100      */
101     protected int backlog = 50;
102 
103     /**
104      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
105      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
106      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
107      * pool size will be used.
108      * 
109      * @see SimpleIoProcessorPool
110      * 
111      * @param sessionConfig
112      *            the default configuration for the managed {@link IoSession}
113      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
114      *            type.
115      */
116     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
117         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
118     }
119 
120     /**
121      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
122      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
123      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
124      * systems.
125      * 
126      * @see SimpleIoProcessorPool
127      * 
128      * @param sessionConfig
129      *            the default configuration for the managed {@link IoSession}
130      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
131      *            type.
132      * @param processorCount the amount of processor to instantiate for the pool
133      */
134     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
135             int processorCount) {
136         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
137     }
138 
139     /**
140      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
141      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
142      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
143      * systems.
144      *
145      * @see SimpleIoProcessorPool
146      *
147      * @param sessionConfig
148      *            the default configuration for the managed {@link IoSession}
149      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
150      *            type.
151      * @param processorCount the amount of processor to instantiate for the pool
152      * @param selectorProvider The SelectorProvider to use
153      */
154     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
155             int processorCount, SelectorProvider selectorProvider ) {
156         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
157     }
158 
159     /**
160      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
161      * session configuration, a default {@link Executor} will be created using
162      * {@link Executors#newCachedThreadPool()}.
163      * 
164      * @see AbstractIoService
165      * 
166      * @param sessionConfig
167      *            the default configuration for the managed {@link IoSession}
168      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
169      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
170      */
171     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
172         this(sessionConfig, null, processor, false, null);
173     }
174 
175     /**
176      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
177      * default session configuration and an {@link Executor} for handling I/O
178      * events. If a null {@link Executor} is provided, a default one will be
179      * created using {@link Executors#newCachedThreadPool()}.
180      * 
181      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
182      * 
183      * @param sessionConfig
184      *            the default configuration for the managed {@link IoSession}
185      * @param executor
186      *            the {@link Executor} used for handling asynchronous execution
187      *            of I/O events. Can be <code>null</code>.
188      * @param processor
189      *            the {@link IoProcessor} for processing the {@link IoSession}
190      *            of this transport, triggering events to the bound
191      *            {@link IoHandler} and processing the chains of
192      *            {@link IoFilter}
193      */
194     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
195         this(sessionConfig, executor, processor, false, null);
196     }
197 
198     /**
199      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
200      * default session configuration and an {@link Executor} for handling I/O
201      * events. If a null {@link Executor} is provided, a default one will be
202      * created using {@link Executors#newCachedThreadPool()}.
203      * 
204      * @see #AbstractIoService(IoSessionConfig, Executor)
205      * 
206      * @param sessionConfig
207      *            the default configuration for the managed {@link IoSession}
208      * @param executor
209      *            the {@link Executor} used for handling asynchronous execution
210      *            of I/O events. Can be <code>null</code>.
211      * @param processor
212      *            the {@link IoProcessor} for processing the {@link IoSession}
213      *            of this transport, triggering events to the bound
214      *            {@link IoHandler} and processing the chains of
215      *            {@link IoFilter}
216      * @param createdProcessor
217      *            tagging the processor as automatically created, so it will be
218      *            automatically disposed
219      */
220     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
221             boolean createdProcessor, SelectorProvider selectorProvider) {
222         super(sessionConfig, executor);
223 
224         if (processor == null) {
225             throw new IllegalArgumentException("processor");
226         }
227 
228         this.processor = processor;
229         this.createdProcessor = createdProcessor;
230 
231         try {
232             // Initialize the selector
233             init(selectorProvider);
234 
235             // The selector is now ready, we can switch the
236             // flag to true so that incoming connection can be accepted
237             selectable = true;
238         } catch (RuntimeException e) {
239             throw e;
240         } catch (Exception e) {
241             throw new RuntimeIoException("Failed to initialize.", e);
242         } finally {
243             if (!selectable) {
244                 try {
245                     destroy();
246                 } catch (Exception e) {
247                     ExceptionMonitor.getInstance().exceptionCaught(e);
248                 }
249             }
250         }
251     }
252 
253     /**
254      * Initialize the polling system, will be called at construction time.
255      * @throws Exception any exception thrown by the underlying system calls
256      */
257     protected abstract void init() throws Exception;
258 
259     /**
260      * Initialize the polling system, will be called at construction time.
261      * 
262      * @param selectorProvider The Selector Provider that will be used by this polling acceptor
263      * @throws Exception any exception thrown by the underlying system calls
264      */
265     protected abstract void init(SelectorProvider selectorProvider) throws Exception;
266 
267     /**
268      * Destroy the polling system, will be called when this {@link IoAcceptor}
269      * implementation will be disposed.
270      * @throws Exception any exception thrown by the underlying systems calls
271      */
272     protected abstract void destroy() throws Exception;
273 
274     /**
275      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
276      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
277      * @return The number of sockets having got incoming client
278      * @throws Exception any exception thrown by the underlying systems calls
279      */
280     protected abstract int select() throws Exception;
281 
282     /**
283      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
284      */
285     protected abstract void wakeup();
286 
287     /**
288      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
289      *  during the last {@link #select()} call.
290      * @return the list of server handles ready
291      */
292     protected abstract Iterator<H> selectedHandles();
293 
294     /**
295      * Open a server socket for a given local address.
296      * @param localAddress the associated local address
297      * @return the opened server socket
298      * @throws Exception any exception thrown by the underlying systems calls
299      */
300     protected abstract H open(SocketAddress localAddress) throws Exception;
301 
302     /**
303      * Get the local address associated with a given server socket
304      * @param handle the server socket
305      * @return the local {@link SocketAddress} associated with this handle
306      * @throws Exception any exception thrown by the underlying systems calls
307      */
308     protected abstract SocketAddress localAddress(H handle) throws Exception;
309 
310     /**
311      * Accept a client connection for a server socket and return a new {@link IoSession}
312      * associated with the given {@link IoProcessor}
313      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
314      * @param handle the server handle
315      * @return the created {@link IoSession}
316      * @throws Exception any exception thrown by the underlying systems calls
317      */
318     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
319 
320     /**
321      * Close a server socket.
322      * @param handle the server socket
323      * @throws Exception any exception thrown by the underlying systems calls
324      */
325     protected abstract void close(H handle) throws Exception;
326 
327     /**
328      * {@inheritDoc}
329      */
330     @Override
331     protected void dispose0() throws Exception {
332         unbind();
333 
334         startupAcceptor();
335         wakeup();
336     }
337 
338     /**
339      * {@inheritDoc}
340      */
341     @Override
342     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
343         // Create a bind request as a Future operation. When the selector
344         // have handled the registration, it will signal this future.
345         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
346 
347         // adds the Registration request to the queue for the Workers
348         // to handle
349         registerQueue.add(request);
350 
351         // creates the Acceptor instance and has the local
352         // executor kick it off.
353         startupAcceptor();
354 
355         // As we just started the acceptor, we have to unblock the select()
356         // in order to process the bind request we just have added to the
357         // registerQueue.
358         try {
359             lock.acquire();
360 
361             wakeup();
362         } finally {
363             lock.release();
364         }
365 
366         // Now, we wait until this request is completed.
367         request.awaitUninterruptibly();
368 
369         if (request.getException() != null) {
370             throw request.getException();
371         }
372 
373         // Update the local addresses.
374         // setLocalAddresses() shouldn't be called from the worker thread
375         // because of deadlock.
376         Set<SocketAddress> newLocalAddresses = new HashSet<>();
377 
378         for (H handle : boundHandles.values()) {
379             newLocalAddresses.add(localAddress(handle));
380         }
381 
382         return newLocalAddresses;
383     }
384 
385     /**
386      * This method is called by the doBind() and doUnbind()
387      * methods.  If the acceptor is null, the acceptor object will
388      * be created and kicked off by the executor.  If the acceptor
389      * object is null, probably already created and this class
390      * is now working, then nothing will happen and the method
391      * will just return.
392      */
393     private void startupAcceptor() throws InterruptedException {
394         // If the acceptor is not ready, clear the queues
395         // TODO : they should already be clean : do we have to do that ?
396         if (!selectable) {
397             registerQueue.clear();
398             cancelQueue.clear();
399         }
400 
401         // start the acceptor if not already started
402         Acceptor acceptor = acceptorRef.get();
403 
404         if (acceptor == null) {
405             lock.acquire();
406             acceptor = new Acceptor();
407 
408             if (acceptorRef.compareAndSet(null, acceptor)) {
409                 executeWorker(acceptor);
410             } else {
411                 lock.release();
412             }
413         }
414     }
415 
416     /**
417      * {@inheritDoc}
418      */
419     @Override
420     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
421         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
422 
423         cancelQueue.add(future);
424         startupAcceptor();
425         wakeup();
426 
427         future.awaitUninterruptibly();
428         if (future.getException() != null) {
429             throw future.getException();
430         }
431     }
432 
433     /**
434      * This class is called by the startupAcceptor() method and is
435      * placed into a NamePreservingRunnable class.
436      * It's a thread accepting incoming connections from clients.
437      * The loop is stopped when all the bound handlers are unbound.
438      */
439     private class Acceptor implements Runnable {
440         /**
441          * {@inheritDoc}
442          */
443         @Override
444         public void run() {
445             assert acceptorRef.get() == this;
446 
447             int nHandles = 0;
448 
449             // Release the lock
450             lock.release();
451 
452             while (selectable) {
453                 try {
454                     // Process the bound sockets to this acceptor.
455                     // this actually sets the selector to OP_ACCEPT,
456                     // and binds to the port on which this class will
457                     // listen on. We do that before the select because 
458                     // the registerQueue containing the new handler is
459                     // already updated at this point.
460                     nHandles += registerHandles();
461 
462                     // Detect if we have some keys ready to be processed
463                     // The select() will be woke up if some new connection
464                     // have occurred, or if the selector has been explicitly
465                     // woke up
466                     int selected = select();
467 
468                     // Now, if the number of registered handles is 0, we can
469                     // quit the loop: we don't have any socket listening
470                     // for incoming connection.
471                     if (nHandles == 0) {
472                         acceptorRef.set(null);
473 
474                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
475                             assert acceptorRef.get() != this;
476                             break;
477                         }
478 
479                         if (!acceptorRef.compareAndSet(null, this)) {
480                             assert acceptorRef.get() != this;
481                             break;
482                         }
483 
484                         assert acceptorRef.get() == this;
485                     }
486 
487                     if (selected > 0) {
488                         // We have some connection request, let's process
489                         // them here.
490                         processHandles(selectedHandles());
491                     }
492 
493                     // check to see if any cancellation request has been made.
494                     nHandles -= unregisterHandles();
495                 } catch (ClosedSelectorException cse) {
496                     // If the selector has been closed, we can exit the loop
497                     ExceptionMonitor.getInstance().exceptionCaught(cse);
498                     break;
499                 } catch (Exception e) {
500                     ExceptionMonitor.getInstance().exceptionCaught(e);
501 
502                     try {
503                         Thread.sleep(1000);
504                     } catch (InterruptedException e1) {
505                         ExceptionMonitor.getInstance().exceptionCaught(e1);
506                     }
507                 }
508             }
509 
510             // Cleanup all the processors, and shutdown the acceptor.
511             if (selectable && isDisposing()) {
512                 selectable = false;
513                 try {
514                     if (createdProcessor) {
515                         processor.dispose();
516                     }
517                 } finally {
518                     try {
519                         synchronized (disposalLock) {
520                             if (isDisposing()) {
521                                 destroy();
522                             }
523                         }
524                     } catch (Exception e) {
525                         ExceptionMonitor.getInstance().exceptionCaught(e);
526                     } finally {
527                         disposalFuture.setDone();
528                     }
529                 }
530             }
531         }
532 
533         /**
534          * This method will process new sessions for the Worker class.  All
535          * keys that have had their status updates as per the Selector.selectedKeys()
536          * method will be processed here.  Only keys that are ready to accept
537          * connections are handled here.
538          * <p/>
539          * Session objects are created by making new instances of SocketSessionImpl
540          * and passing the session object to the SocketIoProcessor class.
541          */
542         @SuppressWarnings("unchecked")
543         private void processHandles(Iterator<H> handles) throws Exception {
544             while (handles.hasNext()) {
545                 H handle = handles.next();
546                 handles.remove();
547 
548                 // Associates a new created connection to a processor,
549                 // and get back a session
550                 S session = accept(processor, handle);
551 
552                 if (session == null) {
553                     continue;
554                 }
555 
556                 initSession(session, null, null);
557 
558                 // add the session to the SocketIoProcessor
559                 session.getProcessor().add(session);
560             }
561         }
562 
563         /**
564          * Sets up the socket communications.  Sets items such as:
565          * <p/>
566          * Blocking
567          * Reuse address
568          * Receive buffer size
569          * Bind to listen port
570          * Registers OP_ACCEPT for selector
571          */
572         private int registerHandles() {
573             for (;;) {
574                 // The register queue contains the list of services to manage
575                 // in this acceptor.
576                 AcceptorOperationFuture future = registerQueue.poll();
577 
578                 if (future == null) {
579                     return 0;
580                 }
581 
582                 // We create a temporary map to store the bound handles,
583                 // as we may have to remove them all if there is an exception
584                 // during the sockets opening.
585                 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
586                 List<SocketAddress> localAddresses = future.getLocalAddresses();
587 
588                 try {
589                     // Process all the addresses
590                     for (SocketAddress a : localAddresses) {
591                         H handle = open(a);
592                         newHandles.put(localAddress(handle), handle);
593                     }
594 
595                     // Everything went ok, we can now update the map storing
596                     // all the bound sockets.
597                     boundHandles.putAll(newHandles);
598 
599                     // and notify.
600                     future.setDone();
601                     
602                     return newHandles.size();
603                 } catch (Exception e) {
604                     // We store the exception in the future
605                     future.setException(e);
606                 } finally {
607                     // Roll back if failed to bind all addresses.
608                     if (future.getException() != null) {
609                         for (H handle : newHandles.values()) {
610                             try {
611                                 close(handle);
612                             } catch (Exception e) {
613                                 ExceptionMonitor.getInstance().exceptionCaught(e);
614                             }
615                         }
616 
617                         // Wake up the selector to be sure we will process the newly bound handle
618                         // and not block forever in the select()
619                         wakeup();
620                     }
621                 }
622             }
623         }
624 
625         /**
626          * This method just checks to see if anything has been placed into the
627          * cancellation queue.  The only thing that should be in the cancelQueue
628          * is CancellationRequest objects and the only place this happens is in
629          * the doUnbind() method.
630          */
631         private int unregisterHandles() {
632             int cancelledHandles = 0;
633             for (;;) {
634                 AcceptorOperationFuture future = cancelQueue.poll();
635                 if (future == null) {
636                     break;
637                 }
638 
639                 // close the channels
640                 for (SocketAddress a : future.getLocalAddresses()) {
641                     H handle = boundHandles.remove(a);
642 
643                     if (handle == null) {
644                         continue;
645                     }
646 
647                     try {
648                         close(handle);
649                         wakeup(); // wake up again to trigger thread death
650                     } catch (Exception e) {
651                         ExceptionMonitor.getInstance().exceptionCaught(e);
652                     } finally {
653                         cancelledHandles++;
654                     }
655                 }
656 
657                 future.setDone();
658             }
659 
660             return cancelledHandles;
661         }
662     }
663 
664     /**
665      * {@inheritDoc}
666      */
667     @Override
668     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
669         throw new UnsupportedOperationException();
670     }
671 
672     /**
673      * @return the backLog
674      */
675     public int getBacklog() {
676         return backlog;
677     }
678 
679     /**
680      * Sets the Backlog parameter
681      * 
682      * @param backlog
683      *            the backlog variable
684      */
685     public void setBacklog(int backlog) {
686         synchronized (bindLock) {
687             if (isActive()) {
688                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
689             }
690 
691             this.backlog = backlog;
692         }
693     }
694 
695     /**
696      * @return the flag that sets the reuseAddress information
697      */
698     public boolean isReuseAddress() {
699         return reuseAddress;
700     }
701 
702     /**
703      * Set the Reuse Address flag
704      * 
705      * @param reuseAddress
706      *            The flag to set
707      */
708     public void setReuseAddress(boolean reuseAddress) {
709         synchronized (bindLock) {
710             if (isActive()) {
711                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
712             }
713 
714             this.reuseAddress = reuseAddress;
715         }
716     }
717 
718     /**
719      * {@inheritDoc}
720      */
721     @Override
722     public SocketSessionConfig getSessionConfig() {
723         return (SocketSessionConfig)sessionConfig;
724     }
725 }