View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.mina.core.RuntimeIoException;
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.future.DefaultConnectFuture;
36  import org.apache.mina.core.service.AbstractIoConnector;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoConnector;
39  import org.apache.mina.core.service.IoHandler;
40  import org.apache.mina.core.service.IoProcessor;
41  import org.apache.mina.core.service.SimpleIoProcessorPool;
42  import org.apache.mina.core.session.AbstractIoSession;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.session.IoSessionConfig;
45  import org.apache.mina.core.session.IoSessionInitializer;
46  import org.apache.mina.transport.socket.nio.NioSocketConnector;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing client transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * connecting and disposing the client sockets. A {@link Executor} will be used
54   * for running client connection, and an {@link AbstractPollingIoProcessor} will
55   * be used for processing connected client I/O operations like reading, writing
56   * and closing.
57   * 
58   * All the low level methods for binding, connecting, closing need to be
59   * provided by the subclassing implementation.
60   * 
61   * @see NioSocketConnector for a example of implementation
62   * @param <H> The type of IoHandler
63   * @param <S> The type of IoSession
64   * 
65   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
66   */
67  public abstract class AbstractPollingIoConnector<S extends AbstractIoSession, H> extends AbstractIoConnector {
68  
69      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<>();
70  
71      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<>();
72  
73      private final IoProcessor<S> processor;
74  
75      private final boolean createdProcessor;
76  
77      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
78  
79      private volatile boolean selectable;
80  
81      /** The connector thread */
82      private final AtomicReference<Connector> connectorRef = new AtomicReference<>();
83  
84      /**
85       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
86       * default session configuration, a class of {@link IoProcessor} which will
87       * be instantiated in a {@link SimpleIoProcessorPool} for better scaling in
88       * multiprocessor systems. The default pool size will be used.
89       * 
90       * @see SimpleIoProcessorPool
91       * 
92       * @param sessionConfig
93       *            the default configuration for the managed {@link IoSession}
94       * @param processorClass
95       *            a {@link Class} of {@link IoProcessor} for the associated
96       *            {@link IoSession} type.
97       */
98      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
99          this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
100     }
101 
102     /**
103      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
104      * default session configuration, a class of {@link IoProcessor} which will
105      * be instantiated in a {@link SimpleIoProcessorPool} for using multiple
106      * thread for better scaling in multiprocessor systems.
107      * 
108      * @see SimpleIoProcessorPool
109      * 
110      * @param sessionConfig
111      *            the default configuration for the managed {@link IoSession}
112      * @param processorClass
113      *            a {@link Class} of {@link IoProcessor} for the associated
114      *            {@link IoSession} type.
115      * @param processorCount
116      *            the amount of processor to instantiate for the pool
117      */
118     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
119             int processorCount) {
120         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
121     }
122 
123     /**
124      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
125      * default session configuration, a default {@link Executor} will be created
126      * using {@link Executors#newCachedThreadPool()}.
127      * 
128      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
129      * 
130      * @param sessionConfig
131      *            the default configuration for the managed {@link IoSession}
132      * @param processor
133      *            the {@link IoProcessor} for processing the {@link IoSession}
134      *            of this transport, triggering events to the bound
135      *            {@link IoHandler} and processing the chains of
136      *            {@link IoFilter}
137      */
138     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
139         this(sessionConfig, null, processor, false);
140     }
141 
142     /**
143      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
144      * default session configuration and an {@link Executor} for handling I/O
145      * events. If null {@link Executor} is provided, a default one will be
146      * created using {@link Executors#newCachedThreadPool()}.
147      * 
148      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
149      * 
150      * @param sessionConfig
151      *            the default configuration for the managed {@link IoSession}
152      * @param executor
153      *            the {@link Executor} used for handling asynchronous execution
154      *            of I/O events. Can be <code>null</code>.
155      * @param processor
156      *            the {@link IoProcessor} for processing the {@link IoSession}
157      *            of this transport, triggering events to the bound
158      *            {@link IoHandler} and processing the chains of
159      *            {@link IoFilter}
160      */
161     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
162         this(sessionConfig, executor, processor, false);
163     }
164 
165     /**
166      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
167      * default session configuration and an {@link Executor} for handling I/O
168      * events. If null {@link Executor} is provided, a default one will be
169      * created using {@link Executors#newCachedThreadPool()}.
170      * 
171      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
172      * 
173      * @param sessionConfig
174      *            the default configuration for the managed {@link IoSession}
175      * @param executor
176      *            the {@link Executor} used for handling asynchronous execution
177      *            of I/O events. Can be <code>null</code>.
178      * @param processor
179      *            the {@link IoProcessor} for processing the {@link IoSession}
180      *            of this transport, triggering events to the bound
181      *            {@link IoHandler} and processing the chains of
182      *            {@link IoFilter}
183      * @param createdProcessor
184      *            tagging the processor as automatically created, so it will be
185      *            automatically disposed
186      */
187     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
188             boolean createdProcessor) {
189         super(sessionConfig, executor);
190 
191         if (processor == null) {
192             throw new IllegalArgumentException("processor");
193         }
194 
195         this.processor = processor;
196         this.createdProcessor = createdProcessor;
197 
198         try {
199             init();
200             selectable = true;
201         } catch (RuntimeException e) {
202             throw e;
203         } catch (Exception e) {
204             throw new RuntimeIoException("Failed to initialize.", e);
205         } finally {
206             if (!selectable) {
207                 try {
208                     destroy();
209                 } catch (Exception e) {
210                     ExceptionMonitor.getInstance().exceptionCaught(e);
211                 }
212             }
213         }
214     }
215 
216     /**
217      * Initialize the polling system, will be called at construction time.
218      * 
219      * @throws Exception
220      *             any exception thrown by the underlying system calls
221      */
222     protected abstract void init() throws Exception;
223 
224     /**
225      * Destroy the polling system, will be called when this {@link IoConnector}
226      * implementation will be disposed.
227      * 
228      * @throws Exception
229      *             any exception thrown by the underlying systems calls
230      */
231     protected abstract void destroy() throws Exception;
232 
233     /**
234      * Create a new client socket handle from a local {@link SocketAddress}
235      * 
236      * @param localAddress
237      *            the socket address for binding the new client socket
238      * @return a new client socket handle
239      * @throws Exception
240      *             any exception thrown by the underlying systems calls
241      */
242     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
243 
244     /**
245      * Connect a newly created client socket handle to a remote
246      * {@link SocketAddress}. This operation is non-blocking, so at end of the
247      * call the socket can be still in connection process.
248      * 
249      * @param handle the client socket handle
250      * @param remoteAddress the remote address where to connect
251      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if
252      *         this client socket is in non-blocking mode and the connection
253      *         operation is in progress
254      * @throws Exception If the connect failed
255      */
256     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
257 
258     /**
259      * Finish the connection process of a client socket after it was marked as
260      * ready to process by the {@link #select(int)} call. The socket will be
261      * connected or reported as connection failed.
262      * 
263      * @param handle
264      *            the client socket handle to finish to connect
265      * @return true if the socket is connected
266      * @throws Exception
267      *             any exception thrown by the underlying systems calls
268      */
269     protected abstract boolean finishConnect(H handle) throws Exception;
270 
271     /**
272      * Create a new {@link IoSession} from a connected socket client handle.
273      * Will assign the created {@link IoSession} to the given
274      * {@link IoProcessor} for managing future I/O events.
275      * 
276      * @param processor
277      *            the processor in charge of this session
278      * @param handle
279      *            the newly connected client socket handle
280      * @return a new {@link IoSession}
281      * @throws Exception
282      *             any exception thrown by the underlying systems calls
283      */
284     protected abstract S newSession(IoProcessor<S> processor, H handle) throws Exception;
285 
286     /**
287      * Close a client socket.
288      * 
289      * @param handle
290      *            the client socket
291      * @throws Exception
292      *             any exception thrown by the underlying systems calls
293      */
294     protected abstract void close(H handle) throws Exception;
295 
296     /**
297      * Interrupt the {@link #select(int)} method. Used when the poll set need to
298      * be modified.
299      */
300     protected abstract void wakeup();
301 
302     /**
303      * Check for connected sockets, interrupt when at least a connection is
304      * processed (connected or failed to connect). All the client socket
305      * descriptors processed need to be returned by {@link #selectedHandles()}
306      * 
307      * @param timeout The timeout for the select() method
308      * @return The number of socket having received some data
309      * @throws Exception any exception thrown by the underlying systems calls
310      */
311     protected abstract int select(int timeout) throws Exception;
312 
313     /**
314      * {@link Iterator} for the set of client sockets found connected or failed
315      * to connect during the last {@link #select(int)} call.
316      * 
317      * @return the list of client socket handles to process
318      */
319     protected abstract Iterator<H> selectedHandles();
320 
321     /**
322      * {@link Iterator} for all the client sockets polled for connection.
323      * 
324      * @return the list of client sockets currently polled for connection
325      */
326     protected abstract Iterator<H> allHandles();
327 
328     /**
329      * Register a new client socket for connection, add it to connection polling
330      * 
331      * @param handle
332      *            client socket handle
333      * @param request
334      *            the associated {@link ConnectionRequest}
335      * @throws Exception
336      *             any exception thrown by the underlying systems calls
337      */
338     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
339 
340     /**
341      * get the {@link ConnectionRequest} for a given client socket handle
342      * 
343      * @param handle
344      *            the socket client handle
345      * @return the connection request if the socket is connecting otherwise
346      *         <code>null</code>
347      */
348     protected abstract ConnectionRequest getConnectionRequest(H handle);
349 
350     /**
351      * {@inheritDoc}
352      */
353     @Override
354     protected final void dispose0() throws Exception {
355         startupWorker();
356         wakeup();
357     }
358 
359     /**
360      * {@inheritDoc}
361      */
362     @Override
363     @SuppressWarnings("unchecked")
364     protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
365             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
366         H handle = null;
367         boolean success = false;
368         try {
369             handle = newHandle(localAddress);
370             if (connect(handle, remoteAddress)) {
371                 ConnectFuture future = new DefaultConnectFuture();
372                 S session = newSession(processor, handle);
373                 initSession(session, future, sessionInitializer);
374                 // Forward the remaining process to the IoProcessor.
375                 session.getProcessor().add(session);
376                 success = true;
377                 return future;
378             }
379 
380             success = true;
381         } catch (Exception e) {
382             return DefaultConnectFuture.newFailedFuture(e);
383         } finally {
384             if (!success && handle != null) {
385                 try {
386                     close(handle);
387                 } catch (Exception e) {
388                     ExceptionMonitor.getInstance().exceptionCaught(e);
389                 }
390             }
391         }
392 
393         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
394         connectQueue.add(request);
395         startupWorker();
396         wakeup();
397 
398         return request;
399     }
400 
401     private void startupWorker() {
402         if (!selectable) {
403             connectQueue.clear();
404             cancelQueue.clear();
405         }
406 
407         Connector connector = connectorRef.get();
408 
409         if (connector == null) {
410             connector = new Connector();
411 
412             if (connectorRef.compareAndSet(null, connector)) {
413                 executeWorker(connector);
414             }
415         }
416     }
417 
418     private class Connector implements Runnable {
419         /**
420          * {@inheritDoc}
421          */
422         @Override
423         public void run() {
424             assert connectorRef.get() == this;
425 
426             int nHandles = 0;
427 
428             while (selectable) {
429                 try {
430                     // the timeout for select shall be smaller of the connect
431                     // timeout or 1 second...
432                     int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
433                     int selected = select(timeout);
434 
435                     nHandles += registerNew();
436 
437                     // get a chance to get out of the connector loop, if we
438                     // don't have any more handles
439                     if (nHandles == 0) {
440                         connectorRef.set(null);
441 
442                         if (connectQueue.isEmpty()) {
443                             assert connectorRef.get() != this;
444                             break;
445                         }
446 
447                         if (!connectorRef.compareAndSet(null, this)) {
448                             assert connectorRef.get() != this;
449                             break;
450                         }
451 
452                         assert connectorRef.get() == this;
453                     }
454 
455                     if (selected > 0) {
456                         nHandles -= processConnections(selectedHandles());
457                     }
458 
459                     processTimedOutSessions(allHandles());
460 
461                     nHandles -= cancelKeys();
462                 } catch (ClosedSelectorException cse) {
463                     // If the selector has been closed, we can exit the loop
464                     ExceptionMonitor.getInstance().exceptionCaught(cse);
465                     break;
466                 } catch (Exception e) {
467                     ExceptionMonitor.getInstance().exceptionCaught(e);
468 
469                     try {
470                         Thread.sleep(1000);
471                     } catch (InterruptedException e1) {
472                         ExceptionMonitor.getInstance().exceptionCaught(e1);
473                     }
474                 }
475             }
476 
477             if (selectable && isDisposing()) {
478                 selectable = false;
479                 try {
480                     if (createdProcessor) {
481                         processor.dispose();
482                     }
483                 } finally {
484                     try {
485                         synchronized (disposalLock) {
486                             if (isDisposing()) {
487                                 destroy();
488                             }
489                         }
490                     } catch (Exception e) {
491                         ExceptionMonitor.getInstance().exceptionCaught(e);
492                     } finally {
493                         disposalFuture.setDone();
494                     }
495                 }
496             }
497         }
498 
499         private int registerNew() {
500             int nHandles = 0;
501             for (;;) {
502                 ConnectionRequest req = connectQueue.poll();
503                 if (req == null) {
504                     break;
505                 }
506 
507                 H handle = req.handle;
508                 try {
509                     register(handle, req);
510                     nHandles++;
511                 } catch (Exception e) {
512                     req.setException(e);
513                     try {
514                         close(handle);
515                     } catch (Exception e2) {
516                         ExceptionMonitor.getInstance().exceptionCaught(e2);
517                     }
518                 }
519             }
520             return nHandles;
521         }
522 
523         private int cancelKeys() {
524             int nHandles = 0;
525 
526             for (;;) {
527                 ConnectionRequest req = cancelQueue.poll();
528 
529                 if (req == null) {
530                     break;
531                 }
532 
533                 H handle = req.handle;
534 
535                 try {
536                     close(handle);
537                 } catch (Exception e) {
538                     ExceptionMonitor.getInstance().exceptionCaught(e);
539                 } finally {
540                     nHandles++;
541                 }
542             }
543 
544             if (nHandles > 0) {
545                 wakeup();
546             }
547 
548             return nHandles;
549         }
550 
551         /**
552          * Process the incoming connections, creating a new session for each valid
553          * connection.
554          */
555         private int processConnections(Iterator<H> handlers) {
556             int nHandles = 0;
557 
558             // Loop on each connection request
559             while (handlers.hasNext()) {
560                 H handle = handlers.next();
561                 handlers.remove();
562 
563                 ConnectionRequest connectionRequest = getConnectionRequest(handle);
564 
565                 if (connectionRequest == null) {
566                     continue;
567                 }
568 
569                 boolean success = false;
570                 try {
571                     if (finishConnect(handle)) {
572                         S session = newSession(processor, handle);
573                         initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
574                         // Forward the remaining process to the IoProcessor.
575                         session.getProcessor().add(session);
576                         nHandles++;
577                     }
578                     success = true;
579                 } catch (Exception e) {
580                     connectionRequest.setException(e);
581                 } finally {
582                     if (!success) {
583                         // The connection failed, we have to cancel it.
584                         cancelQueue.offer(connectionRequest);
585                     }
586                 }
587             }
588             return nHandles;
589         }
590 
591         private void processTimedOutSessions(Iterator<H> handles) {
592             long currentTime = System.currentTimeMillis();
593 
594             while (handles.hasNext()) {
595                 H handle = handles.next();
596                 ConnectionRequest connectionRequest = getConnectionRequest(handle);
597 
598                 if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
599                     connectionRequest.setException(new ConnectException("Connection timed out."));
600                     cancelQueue.offer(connectionRequest);
601                 }
602             }
603         }
604     }
605 
606     /**
607      * A ConnectionRequest's Iouture 
608      */
609     public final class ConnectionRequest extends DefaultConnectFuture {
610         /** The handle associated with this connection request */
611         private final H handle;
612 
613         /** The time up to this connection request will be valid */
614         private final long deadline;
615 
616         /** The callback to call when the session is initialized */
617         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
618 
619         /**
620          * Creates a new ConnectionRequest instance
621          * 
622          * @param handle The IoHander
623          * @param callback The IoFuture callback
624          */
625         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
626             this.handle = handle;
627             long timeout = getConnectTimeoutMillis();
628 
629             if (timeout <= 0L) {
630                 this.deadline = Long.MAX_VALUE;
631             } else {
632                 this.deadline = System.currentTimeMillis() + timeout;
633             }
634 
635             this.sessionInitializer = callback;
636         }
637 
638         /**
639          * @return The IoHandler instance
640          */
641         public H getHandle() {
642             return handle;
643         }
644 
645         /**
646          * @return The connection deadline 
647          */
648         public long getDeadline() {
649             return deadline;
650         }
651 
652         /**
653          * @return The session initializer callback
654          */
655         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
656             return sessionInitializer;
657         }
658 
659         /**
660          * {@inheritDoc}
661          */
662         @Override
663         public boolean cancel() {
664             if (!isDone()) {
665                 boolean justCancelled = super.cancel();
666 
667                 // We haven't cancelled the request before, so add the future
668                 // in the cancel queue.
669                 if (justCancelled) {
670                     cancelQueue.add(this);
671                     startupWorker();
672                     wakeup();
673                 }
674             }
675 
676             return true;
677         }
678     }
679 }