View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.Executors;
34  
35  import org.apache.mina.core.RuntimeIoException;
36  import org.apache.mina.core.filterchain.IoFilter;
37  import org.apache.mina.core.future.IoFuture;
38  import org.apache.mina.core.service.AbstractIoAcceptor;
39  import org.apache.mina.core.service.IoAcceptor;
40  import org.apache.mina.core.service.IoHandler;
41  import org.apache.mina.core.service.IoProcessor;
42  import org.apache.mina.core.service.SimpleIoProcessorPool;
43  import org.apache.mina.core.session.AbstractIoSession;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * accepting and disposing the server sockets. An {@link Executor} will be used
54   * for running client accepting and an {@link AbstractPollingIoProcessor} will
55   * be used for processing client I/O operations like reading, writing and
56   * closing.
57   * 
58   * All the low level methods for binding, accepting, closing need to be provided
59   * by the subclassing implementation.
60   * 
61   * @see NioSocketAcceptor for a example of implementation
62   * 
63   * @author The Apache MINA Project (dev@mina.apache.org)
64   */
65  public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
66          extends AbstractIoAcceptor {
67  
68      private final IoProcessor<T> processor;
69  
70      private final boolean createdProcessor;
71  
72      private final Object lock = new Object();
73  
74      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
75  
76      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
77  
78      private final Map<SocketAddress, H> boundHandles = Collections
79              .synchronizedMap(new HashMap<SocketAddress, H>());
80  
81      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
82  
83      /** A flag set when the acceptor has been created and initialized */
84      private volatile boolean selectable;
85  
86      /** The thread responsible of accepting incoming requests */ 
87      private Acceptor acceptor;
88  
89      /**
90       * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
91       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
92       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
93       * pool size will be used.
94       * 
95       * @see SimpleIoProcessorPool
96       * 
97       * @param sessionConfig
98       *            the default configuration for the managed {@link IoSession}
99       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
100      *            type.
101      */
102     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
103             Class<? extends IoProcessor<T>> processorClass) {
104         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
105                 true);
106     }
107 
108     /**
109      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
110      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
111      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
112      * systems.
113      * 
114      * @see SimpleIoProcessorPool
115      * 
116      * @param sessionConfig
117      *            the default configuration for the managed {@link IoSession}
118      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
119      *            type.
120      * @param processorCount the amount of processor to instantiate for the pool
121      */
122     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
123             Class<? extends IoProcessor<T>> processorClass, int processorCount) {
124         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass,
125                 processorCount), true);
126     }
127 
128     /**
129      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
130      * session configuration, a default {@link Executor} will be created using
131      * {@link Executors#newCachedThreadPool()}.
132      * 
133      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
134      * 
135      * @param sessionConfig
136      *            the default configuration for the managed {@link IoSession}
137      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
138      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
139      */
140     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
141             IoProcessor<T> processor) {
142         this(sessionConfig, null, processor, false);
143     }
144 
145     /**
146      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
147      * session configuration and an {@link Executor} for handling I/O events. If a
148      * null {@link Executor} is provided, a default one will be created using
149      * {@link Executors#newCachedThreadPool()}.
150      * 
151      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
152      * 
153      * @param sessionConfig
154      *            the default configuration for the managed {@link IoSession}
155      * @param executor
156      *            the {@link Executor} used for handling asynchronous execution of I/O
157      *            events. Can be <code>null</code>.
158      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
159      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
160      */
161     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
162             Executor executor, IoProcessor<T> processor) {
163         this(sessionConfig, executor, processor, false);
164     }
165 
166     /**
167      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
168      * session configuration and an {@link Executor} for handling I/O events. If a
169      * null {@link Executor} is provided, a default one will be created using
170      * {@link Executors#newCachedThreadPool()}.
171      * 
172      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
173      * 
174      * @param sessionConfig
175      *            the default configuration for the managed {@link IoSession}
176      * @param executor
177      *            the {@link Executor} used for handling asynchronous execution of I/O
178      *            events. Can be <code>null</code>.
179      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of 
180      * this transport, triggering events to the bound {@link IoHandler} and processing 
181      * the chains of {@link IoFilter}
182      * @param createdProcessor tagging the processor as automatically created, so it 
183      * will be automatically disposed 
184      */
185     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
186             Executor executor, IoProcessor<T> processor,
187             boolean createdProcessor) {
188         super(sessionConfig, executor);
189 
190         if (processor == null) {
191             throw new NullPointerException("processor");
192         }
193 
194         this.processor = processor;
195         this.createdProcessor = createdProcessor;
196 
197         try {
198             // Initialize the selector
199             init();
200             
201             // The selector is now ready, we can switch the
202             // flag to true so that incoming connection can be accepted
203             selectable = true;
204         } catch (RuntimeException e) {
205             throw e;
206         } catch (Exception e) {
207             throw new RuntimeIoException("Failed to initialize.", e);
208         } finally {
209             if (!selectable) {
210                 try {
211                     destroy();
212                 } catch (Exception e) {
213                     ExceptionMonitor.getInstance().exceptionCaught(e);
214                 }
215             }
216         }
217     }
218 
219     /**
220      * Initialize the polling system, will be called at construction time.
221      * @throws Exception any exception thrown by the underlying system calls  
222      */
223     protected abstract void init() throws Exception;
224 
225     /**
226      * Destroy the polling system, will be called when this {@link IoAcceptor}
227      * implementation will be disposed.  
228      * @throws Exception any exception thrown by the underlying systems calls
229      */
230     protected abstract void destroy() throws Exception;
231 
232     /**
233      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
234      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
235      * @return The number of sockets having got incoming client
236      * @throws Exception any exception thrown by the underlying systems calls
237      */
238     protected abstract int select() throws Exception;
239 
240     /**
241      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
242      */
243     protected abstract void wakeup();
244 
245     /**
246      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
247      *  during the last {@link #select()} call.
248      * @return the list of server handles ready
249      */
250     protected abstract Iterator<H> selectedHandles();
251 
252     /**
253      * Open a server socket for a given local address.
254      * @param localAddress the associated local address
255      * @return the opened server socket
256      * @throws Exception any exception thrown by the underlying systems calls
257      */
258     protected abstract H open(SocketAddress localAddress) throws Exception;
259 
260     /**
261      * Get the local address associated with a given server socket
262      * @param handle the server socket
263      * @return the local {@link SocketAddress} associated with this handle
264      * @throws Exception any exception thrown by the underlying systems calls
265      */
266     protected abstract SocketAddress localAddress(H handle) throws Exception;
267 
268     /**
269      * Accept a client connection for a server socket and return a new {@link IoSession}
270      * associated with the given {@link IoProcessor}
271      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}  
272      * @param handle the server handle
273      * @return the created {@link IoSession}
274      * @throws Exception any exception thrown by the underlying systems calls
275      */
276     protected abstract T accept(IoProcessor<T> processor, H handle)
277             throws Exception;
278 
279     /**
280      * Close a server socket.
281      * @param handle the server socket
282      * @throws Exception any exception thrown by the underlying systems calls
283      */
284     protected abstract void close(H handle) throws Exception;
285 
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     protected IoFuture dispose0() throws Exception {
291         unbind();
292         if (!disposalFuture.isDone()) {
293             startupAcceptor();
294             wakeup();
295         }
296         return disposalFuture;
297     }
298 
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     protected final Set<SocketAddress> bindInternal(
304             List<? extends SocketAddress> localAddresses) throws Exception {
305         // Create a bind request as a Future operation. When the selector
306         // have handled the registration, it will signal this future.
307         AcceptorOperationFuture request = new AcceptorOperationFuture(
308                 localAddresses);
309 
310         // adds the Registration request to the queue for the Workers
311         // to handle
312         registerQueue.add(request);
313 
314         // creates the Acceptor instance and has the local
315         // executor kick it off.
316         startupAcceptor();
317         
318         // As we just started the acceptor, we have to unblock the select()
319         // in order to process the bind request we just have added to the 
320         // registerQueue.
321         wakeup();
322         
323         // Now, we wait until this request is completed.
324         request.awaitUninterruptibly();
325 
326         if (request.getException() != null) {
327             throw request.getException();
328         }
329 
330         // Update the local addresses.
331         // setLocalAddresses() shouldn't be called from the worker thread
332         // because of deadlock.
333         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
334         
335         for (H handle:boundHandles.values()) {
336             newLocalAddresses.add(localAddress(handle));
337         }
338 
339         return newLocalAddresses;
340     }
341 
342     /**
343      * This method is called by the doBind() and doUnbind()
344      * methods.  If the acceptor is null, the acceptor object will
345      * be created and kicked off by the executor.  If the acceptor
346      * object is null, probably already created and this class
347      * is now working, then nothing will happen and the method
348      * will just return.
349      */
350     private void startupAcceptor() {
351         // If the acceptor is not ready, clear the queues
352         // TODO : they should already be clean : do we have to do that ?
353         if (!selectable) {
354             registerQueue.clear();
355             cancelQueue.clear();
356         }
357 
358         // start the acceptor if not already started
359         synchronized (lock) {
360             if (acceptor == null) {
361                 acceptor = new Acceptor();
362                 executeWorker(acceptor);
363             }
364         }
365     }
366 
367     /**
368      * {@inheritDoc}
369      */
370     @Override
371     protected final void unbind0(List<? extends SocketAddress> localAddresses)
372             throws Exception {
373         AcceptorOperationFuture future = new AcceptorOperationFuture(
374                 localAddresses);
375 
376         cancelQueue.add(future);
377         startupAcceptor();
378         wakeup();
379 
380         future.awaitUninterruptibly();
381         if (future.getException() != null) {
382             throw future.getException();
383         }
384     }
385 
386     /**
387      * This class is called by the startupAcceptor() method and is
388      * placed into a NamePreservingRunnable class.
389      * It's a thread accepting incoming connections from clients.
390      * The loop is stopped when all the bound handlers are unbound.
391      */
392     private class Acceptor implements Runnable {
393         public void run() {
394             int nHandles = 0;
395 
396             while (selectable) {
397                 try {
398                     // Detect if we have some keys ready to be processed
399                     // The select() will be woke up if some new connection
400                     // have occurred, or if the selector has been explicitly
401                     // woke up
402                     int selected = select();
403 
404                     // this actually sets the selector to OP_ACCEPT,
405                     // and binds to the port on which this class will
406                     // listen on
407                     nHandles += registerHandles();
408 
409                     if (selected > 0) {
410                         // We have some connection request, let's process 
411                         // them here. 
412                         processHandles(selectedHandles());
413                     }
414 
415                     // check to see if any cancellation request has been made.
416                     nHandles -= unregisterHandles();
417 
418                     // Now, if the number of registred handles is 0, we can
419                     // quit the loop: we don't have any socket listening
420                     // for incoming connection.
421                     if (nHandles == 0) {
422                         synchronized (lock) {
423                             if (registerQueue.isEmpty()
424                                     && cancelQueue.isEmpty()) {
425                                 acceptor = null;
426                                 break;
427                             }
428                         }
429                     }
430                 } catch (Throwable e) {
431                     ExceptionMonitor.getInstance().exceptionCaught(e);
432 
433                     try {
434                         Thread.sleep(1000);
435                     } catch (InterruptedException e1) {
436                         ExceptionMonitor.getInstance().exceptionCaught(e1);
437                     }
438                 }
439             }
440 
441             // Cleanup all the processors, and shutdown the acceptor.
442             if (selectable && isDisposing()) {
443                 selectable = false;
444                 try {
445                     if (createdProcessor) {
446                         processor.dispose();
447                     }
448                 } finally {
449                     try {
450                         synchronized (disposalLock) {
451                             if (isDisposing()) {
452                                 destroy();
453                             }
454                         }
455                     } catch (Exception e) {
456                         ExceptionMonitor.getInstance().exceptionCaught(e);
457                     } finally {
458                         disposalFuture.setDone();
459                     }
460                 }
461             }
462         }
463 
464         /**
465          * This method will process new sessions for the Worker class.  All
466          * keys that have had their status updates as per the Selector.selectedKeys()
467          * method will be processed here.  Only keys that are ready to accept
468          * connections are handled here.
469          * <p/>
470          * Session objects are created by making new instances of SocketSessionImpl
471          * and passing the session object to the SocketIoProcessor class.
472          */
473         @SuppressWarnings("unchecked")
474         private void processHandles(Iterator<H> handles) throws Exception {
475             while (handles.hasNext()) {
476                 H handle = handles.next();
477                 handles.remove();
478 
479                 // Associates a new created connection to a processor,
480                 // and get back a session
481                 T session = accept(processor, handle);
482                 
483                 if (session == null) {
484                     break;
485                 }
486 
487                 initSession(session, null, null);
488 
489                 // add the session to the SocketIoProcessor
490                 session.getProcessor().add(session);
491             }
492         }
493     }
494 
495     /**
496      * Sets up the socket communications.  Sets items such as:
497      * <p/>
498      * Blocking
499      * Reuse address
500      * Receive buffer size
501      * Bind to listen port
502      * Registers OP_ACCEPT for selector
503      */
504     private int registerHandles() {
505         for (;;) {
506             // The register queue contains the list of services to manage
507             // in this acceptor.
508             AcceptorOperationFuture future = registerQueue.poll();
509             
510             if (future == null) {
511                 return 0;
512             }
513 
514             // We create a temporary map to store the bound handles,
515             // as we may have to remove them all if there is an exception
516             // during the sockets opening.
517             Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
518             List<SocketAddress> localAddresses = future.getLocalAddresses();
519 
520             try {
521                 // Process all the addresses
522                 for (SocketAddress a : localAddresses) {
523                     H handle = open(a);
524                     newHandles.put(localAddress(handle), handle);
525                 }
526 
527                 // Everything went ok, we can now update the map storing
528                 // all the bound sockets.
529                 boundHandles.putAll(newHandles);
530 
531                 // and notify.
532                 future.setDone();
533                 return newHandles.size();
534             } catch (Exception e) {
535                 // We store the exception in the future
536                 future.setException(e);
537             } finally {
538                 // Roll back if failed to bind all addresses.
539                 if (future.getException() != null) {
540                     for (H handle : newHandles.values()) {
541                         try {
542                             close(handle);
543                         } catch (Exception e) {
544                             ExceptionMonitor.getInstance().exceptionCaught(e);
545                         }
546                     }
547                     
548                     // TODO : add some comment : what is the wakeup() waking up ?
549                     wakeup();
550                 }
551             }
552         }
553     }
554 
555     /**
556      * This method just checks to see if anything has been placed into the
557      * cancellation queue.  The only thing that should be in the cancelQueue
558      * is CancellationRequest objects and the only place this happens is in
559      * the doUnbind() method.
560      */
561     private int unregisterHandles() {
562         int cancelledHandles = 0;
563         for (;;) {
564             AcceptorOperationFuture future = cancelQueue.poll();
565             if (future == null) {
566                 break;
567             }
568 
569             // close the channels
570             for (SocketAddress a : future.getLocalAddresses()) {
571                 H handle = boundHandles.remove(a);
572                 if (handle == null) {
573                     continue;
574                 }
575 
576                 try {
577                     close(handle);
578                     wakeup(); // wake up again to trigger thread death
579                 } catch (Throwable e) {
580                     ExceptionMonitor.getInstance().exceptionCaught(e);
581                 } finally {
582                     cancelledHandles++;
583                 }
584             }
585 
586             future.setDone();
587         }
588 
589         return cancelledHandles;
590     }
591 
592     /**
593      * {@inheritDoc}
594      */
595     public final IoSession newSession(SocketAddress remoteAddress,
596             SocketAddress localAddress) {
597         throw new UnsupportedOperationException();
598     }
599 }