001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.polling;
021
022import java.net.SocketAddress;
023import java.nio.channels.ClosedSelectorException;
024import java.nio.channels.spi.SelectorProvider;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Queue;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentLinkedQueue;
035import java.util.concurrent.Executor;
036import java.util.concurrent.Executors;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.mina.core.RuntimeIoException;
041import org.apache.mina.core.filterchain.IoFilter;
042import org.apache.mina.core.service.AbstractIoAcceptor;
043import org.apache.mina.core.service.AbstractIoService;
044import org.apache.mina.core.service.IoAcceptor;
045import org.apache.mina.core.service.IoHandler;
046import org.apache.mina.core.service.IoProcessor;
047import org.apache.mina.core.service.SimpleIoProcessorPool;
048import org.apache.mina.core.session.AbstractIoSession;
049import org.apache.mina.core.session.IoSession;
050import org.apache.mina.core.session.IoSessionConfig;
051import org.apache.mina.transport.socket.SocketSessionConfig;
052import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
053import org.apache.mina.util.ExceptionMonitor;
054
055/**
056 * A base class for implementing transport using a polling strategy. The
057 * underlying sockets will be checked in an active loop and woke up when an
058 * socket needed to be processed. This class handle the logic behind binding,
059 * accepting and disposing the server sockets. An {@link Executor} will be used
060 * for running client accepting and an {@link AbstractPollingIoProcessor} will
061 * be used for processing client I/O operations like reading, writing and
062 * closing.
063 * 
064 * All the low level methods for binding, accepting, closing need to be provided
065 * by the subclassing implementation.
066 * 
067 * @see NioSocketAcceptor for a example of implementation
068 * 
069 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
070 */
071public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
072    /** A lock used to protect the selector to be waked up before it's created */
073    private final Semaphore lock = new Semaphore(1);
074
075    private final IoProcessor<S> processor;
076
077    private final boolean createdProcessor;
078
079    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
080
081    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
082
083    private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
084
085    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
086
087    /** A flag set when the acceptor has been created and initialized */
088    private volatile boolean selectable;
089
090    /** The thread responsible of accepting incoming requests */
091    private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>();
092
093    protected boolean reuseAddress = false;
094
095    /**
096     * Define the number of socket that can wait to be accepted. Default
097     * to 50 (as in the SocketServer default).
098     */
099    protected int backlog = 50;
100
101    /**
102     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
103     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
104     * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
105     * pool size will be used.
106     * 
107     * @see SimpleIoProcessorPool
108     * 
109     * @param sessionConfig
110     *            the default configuration for the managed {@link IoSession}
111     * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession}
112     *            type.
113     */
114    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116    }
117
118    /**
119     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
120     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
121     * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
122     * systems.
123     * 
124     * @see SimpleIoProcessorPool
125     * 
126     * @param sessionConfig
127     *            the default configuration for the managed {@link IoSession}
128     * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession}
129     *            type.
130     * @param processorCount the amount of processor to instantiate for the pool
131     */
132    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133            int processorCount) {
134        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135    }
136
137    /**
138     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
139     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
140     * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
141     * systems.
142     *
143     * @see SimpleIoProcessorPool
144     *
145     * @param sessionConfig
146     *            the default configuration for the managed {@link IoSession}
147     * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession}
148     *            type.
149     * @param processorCount the amount of processor to instantiate for the pool
150     * @param selectorProvider The SelectorProvider to use
151     */
152    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153            int processorCount, SelectorProvider selectorProvider ) {
154        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155    }
156
157    /**
158     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
159     * session configuration, a default {@link Executor} will be created using
160     * {@link Executors#newCachedThreadPool()}.
161     * 
162     * @see AbstractIoService
163     * 
164     * @param sessionConfig
165     *            the default configuration for the managed {@link IoSession}
166     * @param processor the {@link IoProcessor}?for processing the {@link IoSession} of this transport, triggering
167     *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168     */
169    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170        this(sessionConfig, null, processor, false, null);
171    }
172
173    /**
174     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
175     * default session configuration and an {@link Executor} for handling I/O
176     * events. If a null {@link Executor} is provided, a default one will be
177     * created using {@link Executors#newCachedThreadPool()}.
178     * 
179     * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
180     * 
181     * @param sessionConfig
182     *            the default configuration for the managed {@link IoSession}
183     * @param executor
184     *            the {@link Executor} used for handling asynchronous execution
185     *            of I/O events. Can be <code>null</code>.
186     * @param processor
187     *            the {@link IoProcessor}?for processing the {@link IoSession}
188     *            of this transport, triggering events to the bound
189     *            {@link IoHandler} and processing the chains of
190     *            {@link IoFilter}
191     */
192    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193        this(sessionConfig, executor, processor, false, null);
194    }
195
196    /**
197     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
198     * default session configuration and an {@link Executor} for handling I/O
199     * events. If a null {@link Executor} is provided, a default one will be
200     * created using {@link Executors#newCachedThreadPool()}.
201     * 
202     * @see AbstractIoService(IoSessionConfig, Executor)
203     * 
204     * @param sessionConfig
205     *            the default configuration for the managed {@link IoSession}
206     * @param executor
207     *            the {@link Executor} used for handling asynchronous execution
208     *            of I/O events. Can be <code>null</code>.
209     * @param processor
210     *            the {@link IoProcessor}?for processing the {@link IoSession}
211     *            of this transport, triggering events to the bound
212     *            {@link IoHandler} and processing the chains of
213     *            {@link IoFilter}
214     * @param createdProcessor
215     *            tagging the processor as automatically created, so it will be
216     *            automatically disposed
217     */
218    private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219            boolean createdProcessor, SelectorProvider selectorProvider) {
220        super(sessionConfig, executor);
221
222        if (processor == null) {
223            throw new IllegalArgumentException("processor");
224        }
225
226        this.processor = processor;
227        this.createdProcessor = createdProcessor;
228
229        try {
230            // Initialize the selector
231            init(selectorProvider);
232
233            // The selector is now ready, we can switch the
234            // flag to true so that incoming connection can be accepted
235            selectable = true;
236        } catch (RuntimeException e) {
237            throw e;
238        } catch (Exception e) {
239            throw new RuntimeIoException("Failed to initialize.", e);
240        } finally {
241            if (!selectable) {
242                try {
243                    destroy();
244                } catch (Exception e) {
245                    ExceptionMonitor.getInstance().exceptionCaught(e);
246                }
247            }
248        }
249    }
250
251    /**
252     * Initialize the polling system, will be called at construction time.
253     * @throws Exception any exception thrown by the underlying system calls
254     */
255    protected abstract void init() throws Exception;
256
257    /**
258     * Initialize the polling system, will be called at construction time.
259     * 
260     * @param selectorProvider The Selector Provider that will be used by this polling acceptor
261     * @throws Exception any exception thrown by the underlying system calls
262     */
263    protected abstract void init(SelectorProvider selectorProvider) throws Exception;
264
265    /**
266     * Destroy the polling system, will be called when this {@link IoAcceptor}
267     * implementation will be disposed.
268     * @throws Exception any exception thrown by the underlying systems calls
269     */
270    protected abstract void destroy() throws Exception;
271
272    /**
273     * Check for acceptable connections, interrupt when at least a server is ready for accepting.
274     * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
275     * @return The number of sockets having got incoming client
276     * @throws Exception any exception thrown by the underlying systems calls
277     */
278    protected abstract int select() throws Exception;
279
280    /**
281     * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
282     */
283    protected abstract void wakeup();
284
285    /**
286     * {@link Iterator} for the set of server sockets found with acceptable incoming connections
287     *  during the last {@link #select()} call.
288     * @return the list of server handles ready
289     */
290    protected abstract Iterator<H> selectedHandles();
291
292    /**
293     * Open a server socket for a given local address.
294     * @param localAddress the associated local address
295     * @return the opened server socket
296     * @throws Exception any exception thrown by the underlying systems calls
297     */
298    protected abstract H open(SocketAddress localAddress) throws Exception;
299
300    /**
301     * Get the local address associated with a given server socket
302     * @param handle the server socket
303     * @return the local {@link SocketAddress} associated with this handle
304     * @throws Exception any exception thrown by the underlying systems calls
305     */
306    protected abstract SocketAddress localAddress(H handle) throws Exception;
307
308    /**
309     * Accept a client connection for a server socket and return a new {@link IoSession}
310     * associated with the given {@link IoProcessor}
311     * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
312     * @param handle the server handle
313     * @return the created {@link IoSession}
314     * @throws Exception any exception thrown by the underlying systems calls
315     */
316    protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
317
318    /**
319     * Close a server socket.
320     * @param handle the server socket
321     * @throws Exception any exception thrown by the underlying systems calls
322     */
323    protected abstract void close(H handle) throws Exception;
324
325    /**
326     * {@inheritDoc}
327     */
328    @Override
329    protected void dispose0() throws Exception {
330        unbind();
331
332        startupAcceptor();
333        wakeup();
334    }
335
336    /**
337     * {@inheritDoc}
338     */
339    @Override
340    protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
341        // Create a bind request as a Future operation. When the selector
342        // have handled the registration, it will signal this future.
343        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
344
345        // adds the Registration request to the queue for the Workers
346        // to handle
347        registerQueue.add(request);
348
349        // creates the Acceptor instance and has the local
350        // executor kick it off.
351        startupAcceptor();
352
353        // As we just started the acceptor, we have to unblock the select()
354        // in order to process the bind request we just have added to the
355        // registerQueue.
356        try {
357            lock.acquire();
358
359            wakeup();
360        } finally {
361            lock.release();
362        }
363
364        // Now, we wait until this request is completed.
365        request.awaitUninterruptibly();
366
367        if (request.getException() != null) {
368            throw request.getException();
369        }
370
371        // Update the local addresses.
372        // setLocalAddresses() shouldn't be called from the worker thread
373        // because of deadlock.
374        Set<SocketAddress> newLocalAddresses = new HashSet<>();
375
376        for (H handle : boundHandles.values()) {
377            newLocalAddresses.add(localAddress(handle));
378        }
379
380        return newLocalAddresses;
381    }
382
383    /**
384     * This method is called by the doBind() and doUnbind()
385     * methods.  If the acceptor is null, the acceptor object will
386     * be created and kicked off by the executor.  If the acceptor
387     * object is null, probably already created and this class
388     * is now working, then nothing will happen and the method
389     * will just return.
390     */
391    private void startupAcceptor() throws InterruptedException {
392        // If the acceptor is not ready, clear the queues
393        // TODO : they should already be clean : do we have to do that ?
394        if (!selectable) {
395            registerQueue.clear();
396            cancelQueue.clear();
397        }
398
399        // start the acceptor if not already started
400        Acceptor acceptor = acceptorRef.get();
401
402        if (acceptor == null) {
403            lock.acquire();
404            acceptor = new Acceptor();
405
406            if (acceptorRef.compareAndSet(null, acceptor)) {
407                executeWorker(acceptor);
408            } else {
409                lock.release();
410            }
411        }
412    }
413
414    /**
415     * {@inheritDoc}
416     */
417    @Override
418    protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
419        AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
420
421        cancelQueue.add(future);
422        startupAcceptor();
423        wakeup();
424
425        future.awaitUninterruptibly();
426        if (future.getException() != null) {
427            throw future.getException();
428        }
429    }
430
431    /**
432     * This class is called by the startupAcceptor() method and is
433     * placed into a NamePreservingRunnable class.
434     * It's a thread accepting incoming connections from clients.
435     * The loop is stopped when all the bound handlers are unbound.
436     */
437    private class Acceptor implements Runnable {
438        public void run() {
439            assert (acceptorRef.get() == this);
440
441            int nHandles = 0;
442
443            // Release the lock
444            lock.release();
445
446            while (selectable) {
447                try {
448                    // Process the bound sockets to this acceptor.
449                    // this actually sets the selector to OP_ACCEPT,
450                    // and binds to the port on which this class will
451                    // listen on. We do that before the select because 
452                    // the registerQueue containing the new handler is
453                    // already updated at this point.
454                    nHandles += registerHandles();
455
456                    // Detect if we have some keys ready to be processed
457                    // The select() will be woke up if some new connection
458                    // have occurred, or if the selector has been explicitly
459                    // woke up
460                    int selected = select();
461
462                    // Now, if the number of registred handles is 0, we can
463                    // quit the loop: we don't have any socket listening
464                    // for incoming connection.
465                    if (nHandles == 0) {
466                        acceptorRef.set(null);
467
468                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
469                            assert (acceptorRef.get() != this);
470                            break;
471                        }
472
473                        if (!acceptorRef.compareAndSet(null, this)) {
474                            assert (acceptorRef.get() != this);
475                            break;
476                        }
477
478                        assert (acceptorRef.get() == this);
479                    }
480
481                    if (selected > 0) {
482                        // We have some connection request, let's process
483                        // them here.
484                        processHandles(selectedHandles());
485                    }
486
487                    // check to see if any cancellation request has been made.
488                    nHandles -= unregisterHandles();
489                } catch (ClosedSelectorException cse) {
490                    // If the selector has been closed, we can exit the loop
491                    ExceptionMonitor.getInstance().exceptionCaught(cse);
492                    break;
493                } catch (Exception e) {
494                    ExceptionMonitor.getInstance().exceptionCaught(e);
495
496                    try {
497                        Thread.sleep(1000);
498                    } catch (InterruptedException e1) {
499                        ExceptionMonitor.getInstance().exceptionCaught(e1);
500                    }
501                }
502            }
503
504            // Cleanup all the processors, and shutdown the acceptor.
505            if (selectable && isDisposing()) {
506                selectable = false;
507                try {
508                    if (createdProcessor) {
509                        processor.dispose();
510                    }
511                } finally {
512                    try {
513                        synchronized (disposalLock) {
514                            if (isDisposing()) {
515                                destroy();
516                            }
517                        }
518                    } catch (Exception e) {
519                        ExceptionMonitor.getInstance().exceptionCaught(e);
520                    } finally {
521                        disposalFuture.setDone();
522                    }
523                }
524            }
525        }
526
527        /**
528         * This method will process new sessions for the Worker class.  All
529         * keys that have had their status updates as per the Selector.selectedKeys()
530         * method will be processed here.  Only keys that are ready to accept
531         * connections are handled here.
532         * <p/>
533         * Session objects are created by making new instances of SocketSessionImpl
534         * and passing the session object to the SocketIoProcessor class.
535         */
536        @SuppressWarnings("unchecked")
537        private void processHandles(Iterator<H> handles) throws Exception {
538            while (handles.hasNext()) {
539                H handle = handles.next();
540                handles.remove();
541
542                // Associates a new created connection to a processor,
543                // and get back a session
544                S session = accept(processor, handle);
545
546                if (session == null) {
547                    continue;
548                }
549
550                initSession(session, null, null);
551
552                // add the session to the SocketIoProcessor
553                session.getProcessor().add(session);
554            }
555        }
556    }
557
558    /**
559     * Sets up the socket communications.  Sets items such as:
560     * <p/>
561     * Blocking
562     * Reuse address
563     * Receive buffer size
564     * Bind to listen port
565     * Registers OP_ACCEPT for selector
566     */
567    private int registerHandles() {
568        for (;;) {
569            // The register queue contains the list of services to manage
570            // in this acceptor.
571            AcceptorOperationFuture future = registerQueue.poll();
572
573            if (future == null) {
574                return 0;
575            }
576
577            // We create a temporary map to store the bound handles,
578            // as we may have to remove them all if there is an exception
579            // during the sockets opening.
580            Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
581            List<SocketAddress> localAddresses = future.getLocalAddresses();
582
583            try {
584                // Process all the addresses
585                for (SocketAddress a : localAddresses) {
586                    H handle = open(a);
587                    newHandles.put(localAddress(handle), handle);
588                }
589
590                // Everything went ok, we can now update the map storing
591                // all the bound sockets.
592                boundHandles.putAll(newHandles);
593
594                // and notify.
595                future.setDone();
596                
597                return newHandles.size();
598            } catch (Exception e) {
599                // We store the exception in the future
600                future.setException(e);
601            } finally {
602                // Roll back if failed to bind all addresses.
603                if (future.getException() != null) {
604                    for (H handle : newHandles.values()) {
605                        try {
606                            close(handle);
607                        } catch (Exception e) {
608                            ExceptionMonitor.getInstance().exceptionCaught(e);
609                        }
610                    }
611
612                    // Wake up the selector to be sure we will process the newly bound handle
613                    // and not block forever in the select()
614                    wakeup();
615                }
616            }
617        }
618    }
619
620    /**
621     * This method just checks to see if anything has been placed into the
622     * cancellation queue.  The only thing that should be in the cancelQueue
623     * is CancellationRequest objects and the only place this happens is in
624     * the doUnbind() method.
625     */
626    private int unregisterHandles() {
627        int cancelledHandles = 0;
628        for (;;) {
629            AcceptorOperationFuture future = cancelQueue.poll();
630            if (future == null) {
631                break;
632            }
633
634            // close the channels
635            for (SocketAddress a : future.getLocalAddresses()) {
636                H handle = boundHandles.remove(a);
637
638                if (handle == null) {
639                    continue;
640                }
641
642                try {
643                    close(handle);
644                    wakeup(); // wake up again to trigger thread death
645                } catch (Exception e) {
646                    ExceptionMonitor.getInstance().exceptionCaught(e);
647                } finally {
648                    cancelledHandles++;
649                }
650            }
651
652            future.setDone();
653        }
654
655        return cancelledHandles;
656    }
657
658    /**
659     * {@inheritDoc}
660     */
661    @Override
662    public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
663        throw new UnsupportedOperationException();
664    }
665
666    /**
667     * @return the backLog
668     */
669    public int getBacklog() {
670        return backlog;
671    }
672
673    /**
674     * Sets the Backlog parameter
675     * 
676     * @param backlog
677     *            the backlog variable
678     */
679    public void setBacklog(int backlog) {
680        synchronized (bindLock) {
681            if (isActive()) {
682                throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
683            }
684
685            this.backlog = backlog;
686        }
687    }
688
689    /**
690     * @return the flag that sets the reuseAddress information
691     */
692    public boolean isReuseAddress() {
693        return reuseAddress;
694    }
695
696    /**
697     * Set the Reuse Address flag
698     * 
699     * @param reuseAddress
700     *            The flag to set
701     */
702    public void setReuseAddress(boolean reuseAddress) {
703        synchronized (bindLock) {
704            if (isActive()) {
705                throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
706            }
707
708            this.reuseAddress = reuseAddress;
709        }
710    }
711
712    /**
713     * {@inheritDoc}
714     */
715    @Override
716    public SocketSessionConfig getSessionConfig() {
717        return (SocketSessionConfig)sessionConfig;
718    }
719}