001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.polling;
021
022import java.net.SocketAddress;
023import java.nio.channels.ClosedSelectorException;
024import java.nio.channels.spi.SelectorProvider;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Queue;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentLinkedQueue;
035import java.util.concurrent.Executor;
036import java.util.concurrent.Executors;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.mina.core.RuntimeIoException;
041import org.apache.mina.core.filterchain.IoFilter;
042import org.apache.mina.core.service.AbstractIoAcceptor;
043import org.apache.mina.core.service.AbstractIoService;
044import org.apache.mina.core.service.IoAcceptor;
045import org.apache.mina.core.service.IoHandler;
046import org.apache.mina.core.service.IoProcessor;
047import org.apache.mina.core.service.SimpleIoProcessorPool;
048import org.apache.mina.core.session.AbstractIoSession;
049import org.apache.mina.core.session.IoSession;
050import org.apache.mina.core.session.IoSessionConfig;
051import org.apache.mina.transport.socket.SocketSessionConfig;
052import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
053import org.apache.mina.util.ExceptionMonitor;
054
055/**
056 * A base class for implementing transport using a polling strategy. The
057 * underlying sockets will be checked in an active loop and woke up when an
058 * socket needed to be processed. This class handle the logic behind binding,
059 * accepting and disposing the server sockets. An {@link Executor} will be used
060 * for running client accepting and an {@link AbstractPollingIoProcessor} will
061 * be used for processing client I/O operations like reading, writing and
062 * closing.
063 * 
064 * All the low level methods for binding, accepting, closing need to be provided
065 * by the subclassing implementation.
066 * 
067 * @see NioSocketAcceptor for a example of implementation
068 * 
069 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
070 */
071public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
072    /** A lock used to protect the selector to be waked up before it's created */
073    private final Semaphore lock = new Semaphore(1);
074
075    private final IoProcessor<S> processor;
076
077    private final boolean createdProcessor;
078
079    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
080
081    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
082
083    private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
084
085    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
086
087    /** A flag set when the acceptor has been created and initialized */
088    private volatile boolean selectable;
089
090    /** The thread responsible of accepting incoming requests */
091    private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
092
093    protected boolean reuseAddress = false;
094
095    /**
096     * Define the number of socket that can wait to be accepted. Default
097     * to 50 (as in the SocketServer default).
098     */
099    protected int backlog = 50;
100
101    /**
102     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
103     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
104     * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
105     * pool size will be used.
106     * 
107     * @see SimpleIoProcessorPool
108     * 
109     * @param sessionConfig
110     *            the default configuration for the managed {@link IoSession}
111     * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
112     *            type.
113     */
114    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116    }
117
118    /**
119     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
120     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
121     * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
122     * systems.
123     * 
124     * @see SimpleIoProcessorPool
125     * 
126     * @param sessionConfig
127     *            the default configuration for the managed {@link IoSession}
128     * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
129     *            type.
130     * @param processorCount the amount of processor to instantiate for the pool
131     */
132    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133            int processorCount) {
134        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135    }
136
137    /**
138     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
139     * session configuration, a class of {@link IoProcessor} which will be instantiated in a
140     * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
141     * systems.
142     *
143     * @see SimpleIoProcessorPool
144     *
145     * @param sessionConfig
146     *            the default configuration for the managed {@link IoSession}
147     * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
148     *            type.
149     * @param processorCount the amount of processor to instantiate for the pool
150     * @param selectorProvider The SelectorProvider to use
151     */
152    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153            int processorCount, SelectorProvider selectorProvider ) {
154        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155    }
156
157    /**
158     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
159     * session configuration, a default {@link Executor} will be created using
160     * {@link Executors#newCachedThreadPool()}.
161     * 
162     * @see AbstractIoService
163     * 
164     * @param sessionConfig
165     *            the default configuration for the managed {@link IoSession}
166     * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
167     *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168     */
169    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170        this(sessionConfig, null, processor, false, null);
171    }
172
173    /**
174     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
175     * default session configuration and an {@link Executor} for handling I/O
176     * events. If a null {@link Executor} is provided, a default one will be
177     * created using {@link Executors#newCachedThreadPool()}.
178     * 
179     * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
180     * 
181     * @param sessionConfig
182     *            the default configuration for the managed {@link IoSession}
183     * @param executor
184     *            the {@link Executor} used for handling asynchronous execution
185     *            of I/O events. Can be <code>null</code>.
186     * @param processor
187     *            the {@link IoProcessor} for processing the {@link IoSession}
188     *            of this transport, triggering events to the bound
189     *            {@link IoHandler} and processing the chains of
190     *            {@link IoFilter}
191     */
192    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193        this(sessionConfig, executor, processor, false, null);
194    }
195
196    /**
197     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
198     * default session configuration and an {@link Executor} for handling I/O
199     * events. If a null {@link Executor} is provided, a default one will be
200     * created using {@link Executors#newCachedThreadPool()}.
201     * 
202     * @see AbstractIoService(IoSessionConfig, Executor)
203     * 
204     * @param sessionConfig
205     *            the default configuration for the managed {@link IoSession}
206     * @param executor
207     *            the {@link Executor} used for handling asynchronous execution
208     *            of I/O events. Can be <code>null</code>.
209     * @param processor
210     *            the {@link IoProcessor} for processing the {@link IoSession}
211     *            of this transport, triggering events to the bound
212     *            {@link IoHandler} and processing the chains of
213     *            {@link IoFilter}
214     * @param createdProcessor
215     *            tagging the processor as automatically created, so it will be
216     *            automatically disposed
217     */
218    private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219            boolean createdProcessor, SelectorProvider selectorProvider) {
220        super(sessionConfig, executor);
221
222        if (processor == null) {
223            throw new IllegalArgumentException("processor");
224        }
225
226        this.processor = processor;
227        this.createdProcessor = createdProcessor;
228
229        try {
230            // Initialize the selector
231            init(selectorProvider);
232
233            // The selector is now ready, we can switch the
234            // flag to true so that incoming connection can be accepted
235            selectable = true;
236        } catch (RuntimeException e) {
237            throw e;
238        } catch (Exception e) {
239            throw new RuntimeIoException("Failed to initialize.", e);
240        } finally {
241            if (!selectable) {
242                try {
243                    destroy();
244                } catch (Exception e) {
245                    ExceptionMonitor.getInstance().exceptionCaught(e);
246                }
247            }
248        }
249    }
250
251    /**
252     * Initialize the polling system, will be called at construction time.
253     * @throws Exception any exception thrown by the underlying system calls
254     */
255    protected abstract void init() throws Exception;
256
257    /**
258     * Initialize the polling system, will be called at construction time.
259     * 
260     * @param selectorProvider The Selector Provider that will be used by this polling acceptor
261     * @throws Exception any exception thrown by the underlying system calls
262     */
263    protected abstract void init(SelectorProvider selectorProvider) throws Exception;
264
265    /**
266     * Destroy the polling system, will be called when this {@link IoAcceptor}
267     * implementation will be disposed.
268     * @throws Exception any exception thrown by the underlying systems calls
269     */
270    protected abstract void destroy() throws Exception;
271
272    /**
273     * Check for acceptable connections, interrupt when at least a server is ready for accepting.
274     * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
275     * @return The number of sockets having got incoming client
276     * @throws Exception any exception thrown by the underlying systems calls
277     */
278    protected abstract int select() throws Exception;
279
280    /**
281     * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
282     */
283    protected abstract void wakeup();
284
285    /**
286     * {@link Iterator} for the set of server sockets found with acceptable incoming connections
287     *  during the last {@link #select()} call.
288     * @return the list of server handles ready
289     */
290    protected abstract Iterator<H> selectedHandles();
291
292    /**
293     * Open a server socket for a given local address.
294     * @param localAddress the associated local address
295     * @return the opened server socket
296     * @throws Exception any exception thrown by the underlying systems calls
297     */
298    protected abstract H open(SocketAddress localAddress) throws Exception;
299
300    /**
301     * Get the local address associated with a given server socket
302     * @param handle the server socket
303     * @return the local {@link SocketAddress} associated with this handle
304     * @throws Exception any exception thrown by the underlying systems calls
305     */
306    protected abstract SocketAddress localAddress(H handle) throws Exception;
307
308    /**
309     * Accept a client connection for a server socket and return a new {@link IoSession}
310     * associated with the given {@link IoProcessor}
311     * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
312     * @param handle the server handle
313     * @return the created {@link IoSession}
314     * @throws Exception any exception thrown by the underlying systems calls
315     */
316    protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
317
318    /**
319     * Close a server socket.
320     * @param handle the server socket
321     * @throws Exception any exception thrown by the underlying systems calls
322     */
323    protected abstract void close(H handle) throws Exception;
324
325    /**
326     * {@inheritDoc}
327     */
328    @Override
329    protected void dispose0() throws Exception {
330        unbind();
331
332        startupAcceptor();
333        wakeup();
334    }
335
336    /**
337     * {@inheritDoc}
338     */
339    @Override
340    protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
341        // Create a bind request as a Future operation. When the selector
342        // have handled the registration, it will signal this future.
343        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
344
345        // adds the Registration request to the queue for the Workers
346        // to handle
347        registerQueue.add(request);
348
349        // creates the Acceptor instance and has the local
350        // executor kick it off.
351        startupAcceptor();
352
353        // As we just started the acceptor, we have to unblock the select()
354        // in order to process the bind request we just have added to the
355        // registerQueue.
356        try {
357            lock.acquire();
358
359            // Wait a bit to give a chance to the Acceptor thread to do the select()
360            Thread.sleep(10);
361            wakeup();
362        } finally {
363            lock.release();
364        }
365
366        // Now, we wait until this request is completed.
367        request.awaitUninterruptibly();
368
369        if (request.getException() != null) {
370            throw request.getException();
371        }
372
373        // Update the local addresses.
374        // setLocalAddresses() shouldn't be called from the worker thread
375        // because of deadlock.
376        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
377
378        for (H handle : boundHandles.values()) {
379            newLocalAddresses.add(localAddress(handle));
380        }
381
382        return newLocalAddresses;
383    }
384
385    /**
386     * This method is called by the doBind() and doUnbind()
387     * methods.  If the acceptor is null, the acceptor object will
388     * be created and kicked off by the executor.  If the acceptor
389     * object is null, probably already created and this class
390     * is now working, then nothing will happen and the method
391     * will just return.
392     */
393    private void startupAcceptor() throws InterruptedException {
394        // If the acceptor is not ready, clear the queues
395        // TODO : they should already be clean : do we have to do that ?
396        if (!selectable) {
397            registerQueue.clear();
398            cancelQueue.clear();
399        }
400
401        // start the acceptor if not already started
402        Acceptor acceptor = acceptorRef.get();
403
404        if (acceptor == null) {
405            lock.acquire();
406            acceptor = new Acceptor();
407
408            if (acceptorRef.compareAndSet(null, acceptor)) {
409                executeWorker(acceptor);
410            } else {
411                lock.release();
412            }
413        }
414    }
415
416    /**
417     * {@inheritDoc}
418     */
419    @Override
420    protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
421        AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
422
423        cancelQueue.add(future);
424        startupAcceptor();
425        wakeup();
426
427        future.awaitUninterruptibly();
428        if (future.getException() != null) {
429            throw future.getException();
430        }
431    }
432
433    /**
434     * This class is called by the startupAcceptor() method and is
435     * placed into a NamePreservingRunnable class.
436     * It's a thread accepting incoming connections from clients.
437     * The loop is stopped when all the bound handlers are unbound.
438     */
439    private class Acceptor implements Runnable {
440        public void run() {
441            assert (acceptorRef.get() == this);
442
443            int nHandles = 0;
444
445            // Release the lock
446            lock.release();
447
448            while (selectable) {
449                try {
450                    // Detect if we have some keys ready to be processed
451                    // The select() will be woke up if some new connection
452                    // have occurred, or if the selector has been explicitly
453                    // woke up
454                    int selected = select();
455
456                    // this actually sets the selector to OP_ACCEPT,
457                    // and binds to the port on which this class will
458                    // listen on
459                    nHandles += registerHandles();
460
461                    // Now, if the number of registred handles is 0, we can
462                    // quit the loop: we don't have any socket listening
463                    // for incoming connection.
464                    if (nHandles == 0) {
465                        acceptorRef.set(null);
466
467                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
468                            assert (acceptorRef.get() != this);
469                            break;
470                        }
471
472                        if (!acceptorRef.compareAndSet(null, this)) {
473                            assert (acceptorRef.get() != this);
474                            break;
475                        }
476
477                        assert (acceptorRef.get() == this);
478                    }
479
480                    if (selected > 0) {
481                        // We have some connection request, let's process
482                        // them here.
483                        processHandles(selectedHandles());
484                    }
485
486                    // check to see if any cancellation request has been made.
487                    nHandles -= unregisterHandles();
488                } catch (ClosedSelectorException cse) {
489                    // If the selector has been closed, we can exit the loop
490                    ExceptionMonitor.getInstance().exceptionCaught(cse);
491                    break;
492                } catch (Exception e) {
493                    ExceptionMonitor.getInstance().exceptionCaught(e);
494
495                    try {
496                        Thread.sleep(1000);
497                    } catch (InterruptedException e1) {
498                        ExceptionMonitor.getInstance().exceptionCaught(e1);
499                    }
500                }
501            }
502
503            // Cleanup all the processors, and shutdown the acceptor.
504            if (selectable && isDisposing()) {
505                selectable = false;
506                try {
507                    if (createdProcessor) {
508                        processor.dispose();
509                    }
510                } finally {
511                    try {
512                        synchronized (disposalLock) {
513                            if (isDisposing()) {
514                                destroy();
515                            }
516                        }
517                    } catch (Exception e) {
518                        ExceptionMonitor.getInstance().exceptionCaught(e);
519                    } finally {
520                        disposalFuture.setDone();
521                    }
522                }
523            }
524        }
525
526        /**
527         * This method will process new sessions for the Worker class.  All
528         * keys that have had their status updates as per the Selector.selectedKeys()
529         * method will be processed here.  Only keys that are ready to accept
530         * connections are handled here.
531         * <p/>
532         * Session objects are created by making new instances of SocketSessionImpl
533         * and passing the session object to the SocketIoProcessor class.
534         */
535        @SuppressWarnings("unchecked")
536        private void processHandles(Iterator<H> handles) throws Exception {
537            while (handles.hasNext()) {
538                H handle = handles.next();
539                handles.remove();
540
541                // Associates a new created connection to a processor,
542                // and get back a session
543                S session = accept(processor, handle);
544
545                if (session == null) {
546                    continue;
547                }
548
549                initSession(session, null, null);
550
551                // add the session to the SocketIoProcessor
552                session.getProcessor().add(session);
553            }
554        }
555    }
556
557    /**
558     * Sets up the socket communications.  Sets items such as:
559     * <p/>
560     * Blocking
561     * Reuse address
562     * Receive buffer size
563     * Bind to listen port
564     * Registers OP_ACCEPT for selector
565     */
566    private int registerHandles() {
567        for (;;) {
568            // The register queue contains the list of services to manage
569            // in this acceptor.
570            AcceptorOperationFuture future = registerQueue.poll();
571
572            if (future == null) {
573                return 0;
574            }
575
576            // We create a temporary map to store the bound handles,
577            // as we may have to remove them all if there is an exception
578            // during the sockets opening.
579            Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
580            List<SocketAddress> localAddresses = future.getLocalAddresses();
581
582            try {
583                // Process all the addresses
584                for (SocketAddress a : localAddresses) {
585                    H handle = open(a);
586                    newHandles.put(localAddress(handle), handle);
587                }
588
589                // Everything went ok, we can now update the map storing
590                // all the bound sockets.
591                boundHandles.putAll(newHandles);
592
593                // and notify.
594                future.setDone();
595                return newHandles.size();
596            } catch (Exception e) {
597                // We store the exception in the future
598                future.setException(e);
599            } finally {
600                // Roll back if failed to bind all addresses.
601                if (future.getException() != null) {
602                    for (H handle : newHandles.values()) {
603                        try {
604                            close(handle);
605                        } catch (Exception e) {
606                            ExceptionMonitor.getInstance().exceptionCaught(e);
607                        }
608                    }
609
610                    // TODO : add some comment : what is the wakeup() waking up ?
611                    wakeup();
612                }
613            }
614        }
615    }
616
617    /**
618     * This method just checks to see if anything has been placed into the
619     * cancellation queue.  The only thing that should be in the cancelQueue
620     * is CancellationRequest objects and the only place this happens is in
621     * the doUnbind() method.
622     */
623    private int unregisterHandles() {
624        int cancelledHandles = 0;
625        for (;;) {
626            AcceptorOperationFuture future = cancelQueue.poll();
627            if (future == null) {
628                break;
629            }
630
631            // close the channels
632            for (SocketAddress a : future.getLocalAddresses()) {
633                H handle = boundHandles.remove(a);
634
635                if (handle == null) {
636                    continue;
637                }
638
639                try {
640                    close(handle);
641                    wakeup(); // wake up again to trigger thread death
642                } catch (Exception e) {
643                    ExceptionMonitor.getInstance().exceptionCaught(e);
644                } finally {
645                    cancelledHandles++;
646                }
647            }
648
649            future.setDone();
650        }
651
652        return cancelledHandles;
653    }
654
655    /**
656     * {@inheritDoc}
657     */
658    public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
659        throw new UnsupportedOperationException();
660    }
661
662    /**
663     * @return the backLog
664     */
665    public int getBacklog() {
666        return backlog;
667    }
668
669    /**
670     * Sets the Backlog parameter
671     * 
672     * @param backlog
673     *            the backlog variable
674     */
675    public void setBacklog(int backlog) {
676        synchronized (bindLock) {
677            if (isActive()) {
678                throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
679            }
680
681            this.backlog = backlog;
682        }
683    }
684
685    /**
686     * @return the flag that sets the reuseAddress information
687     */
688    public boolean isReuseAddress() {
689        return reuseAddress;
690    }
691
692    /**
693     * Set the Reuse Address flag
694     * 
695     * @param reuseAddress
696     *            The flag to set
697     */
698    public void setReuseAddress(boolean reuseAddress) {
699        synchronized (bindLock) {
700            if (isActive()) {
701                throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
702            }
703
704            this.reuseAddress = reuseAddress;
705        }
706    }
707
708    /**
709     * {@inheritDoc}
710     */
711    public SocketSessionConfig getSessionConfig() {
712        return (SocketSessionConfig)sessionConfig;
713    }
714}