View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.util.Iterator;
25  import java.util.Queue;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.Executors;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.filterchain.IoFilter;
32  import org.apache.mina.core.future.ConnectFuture;
33  import org.apache.mina.core.future.DefaultConnectFuture;
34  import org.apache.mina.core.future.IoFuture;
35  import org.apache.mina.core.service.AbstractIoConnector;
36  import org.apache.mina.core.service.IoConnector;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.SimpleIoProcessorPool;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.session.IoSessionInitializer;
44  import org.apache.mina.transport.socket.nio.NioSocketConnector;
45  import org.apache.mina.util.ExceptionMonitor;
46  
47  /**
48   * A base class for implementing client transport using a polling strategy. The
49   * underlying sockets will be checked in an active loop and woke up when an
50   * socket needed to be processed. This class handle the logic behind binding,
51   * connecting and disposing the client sockets. A {@link Executor} will be used
52   * for running client connection, and an {@link AbstractPollingIoProcessor} will
53   * be used for processing connected client I/O operations like reading, writing
54   * and closing.
55   * 
56   * All the low level methods for binding, connecting, closing need to be
57   * provided by the subclassing implementation.
58   * 
59   * @see NioSocketConnector for a example of implementation
60   * 
61   * @author The Apache MINA Project (dev@mina.apache.org)
62   */
63  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
64          extends AbstractIoConnector {
65  
66      private final Object lock = new Object();
67      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69      private final IoProcessor<T> processor;
70      private final boolean createdProcessor;
71  
72      private final ServiceOperationFuture disposalFuture =
73          new ServiceOperationFuture();
74      private volatile boolean selectable;
75      
76      /** The connector thread */
77      private Connector connector;
78  
79      /**
80       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
81       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
82       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
83       * pool size will be used.
84       * 
85       * @see SimpleIoProcessorPool
86       * 
87       * @param sessionConfig
88       *            the default configuration for the managed {@link IoSession}
89       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
90       *            type.
91       */
92      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
93          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
94      }
95  
96      /**
97       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
98       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
99       * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
100      * systems.
101      * 
102      * @see SimpleIoProcessorPool
103      * 
104      * @param sessionConfig
105      *            the default configuration for the managed {@link IoSession}
106      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
107      *            type.
108      * @param processorCount the amount of processor to instantiate for the pool
109      */
110     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
111         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
112     }
113 
114     /**
115      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
116      * session configuration, a default {@link Executor} will be created using
117      * {@link Executors#newCachedThreadPool()}.
118      * 
119      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
120      * 
121      * @param sessionConfig
122      *            the default configuration for the managed {@link IoSession}
123      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
124      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
125      */
126     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
127         this(sessionConfig, null, processor, false);
128     }
129 
130     /**
131      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
132      * session configuration and an {@link Executor} for handling I/O events. If
133      * null {@link Executor} is provided, a default one will be created using
134      * {@link Executors#newCachedThreadPool()}.
135      * 
136      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
137      * 
138      * @param sessionConfig
139      *            the default configuration for the managed {@link IoSession}
140      * @param executor
141      *            the {@link Executor} used for handling asynchronous execution of I/O
142      *            events. Can be <code>null</code>.
143      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
144      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
145      */
146     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
147         this(sessionConfig, executor, processor, false);
148     }
149 
150     /**
151      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
152      * session configuration and an {@link Executor} for handling I/O events. If
153      * null {@link Executor} is provided, a default one will be created using
154      * {@link Executors#newCachedThreadPool()}.
155      * 
156      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
157      * 
158      * @param sessionConfig
159      *            the default configuration for the managed {@link IoSession}
160      * @param executor
161      *            the {@link Executor} used for handling asynchronous execution of I/O
162      *            events. Can be <code>null</code>.
163      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
164      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
165      * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed 
166      */
167     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
168         super(sessionConfig, executor);
169 
170         if (processor == null) {
171             throw new NullPointerException("processor");
172         }
173 
174         this.processor = processor;
175         this.createdProcessor = createdProcessor;
176 
177         try {
178             init();
179             selectable = true;
180         } catch (RuntimeException e){
181             throw e;
182         } catch (Exception e) {
183             throw new RuntimeIoException("Failed to initialize.", e);
184         } finally {
185             if (!selectable) {
186                 try {
187                     destroy();
188                 } catch (Exception e) {
189                     ExceptionMonitor.getInstance().exceptionCaught(e);
190                 }
191             }
192         }
193     }
194 
195     /**
196      * Initialize the polling system, will be called at construction time.
197      * @throws Exception any exception thrown by the underlying system calls  
198      */
199     protected abstract void init() throws Exception;
200 
201     /**
202      * Destroy the polling system, will be called when this {@link IoConnector}
203      * implementation will be disposed.  
204      * @throws Exception any exception thrown by the underlying systems calls
205      */
206     protected abstract void destroy() throws Exception;
207     
208     /**
209      * Create a new client socket handle from a local {@link SocketAddress}
210      * @param localAddress the socket address for binding the new client socket 
211      * @return a new client socket handle 
212      * @throws Exception any exception thrown by the underlying systems calls
213      */
214     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
215     
216     /**
217      * Connect a newly created client socket handle to a remote {@link SocketAddress}.
218      * This operation is non-blocking, so at end of the call the socket can be still in connection
219      * process.
220      * @param handle the client socket handle
221      * @param remoteAddress the remote address where to connect
222      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket 
223      *         is in non-blocking mode and the connection operation is in progress
224      * @throws Exception
225      */
226     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
227     
228     /**
229      * Finish the connection process of a client socket after it was marked as ready to process
230      * by the {@link #select(int)} call. The socket will be connected or reported as connection
231      * failed.
232      * @param handle the client socket handle to finsh to connect
233      * @return true if the socket is connected
234      * @throws Exception any exception thrown by the underlying systems calls
235      */
236     protected abstract boolean finishConnect(H handle) throws Exception;
237     
238     /**
239      * Create a new {@link IoSession} from a connected socket client handle.
240      * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
241      * managing future I/O events.
242      * @param processor the processor in charge of this session
243      * @param handle the newly connected client socket handle
244      * @return a new {@link IoSession}
245      * @throws Exception any exception thrown by the underlying systems calls
246      */
247     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
248 
249     /**
250      * Close a client socket.
251      * @param handle the client socket
252      * @throws Exception any exception thrown by the underlying systems calls
253      */
254     protected abstract void close(H handle) throws Exception;
255     
256     /**
257      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
258      */
259     protected abstract void wakeup();
260     
261     /**
262      * Check for connected sockets, interrupt when at least a connection is processed (connected or
263      * failed to connect). All the client socket descriptors processed need to be returned by 
264      * {@link #selectedHandles()}
265      * @return The number of socket having received some data
266      * @throws Exception any exception thrown by the underlying systems calls
267      */
268     protected abstract int select(int timeout) throws Exception;
269     
270     /**
271      * {@link Iterator} for the set of client sockets found connected or 
272      * failed to connect during the last {@link #select()} call.
273      * @return the list of client socket handles to process
274      */
275     protected abstract Iterator<H> selectedHandles();
276     
277     /**
278      * {@link Iterator} for all the client sockets polled for connection.
279      * @return the list of client sockets currently polled for connection
280      */
281     protected abstract Iterator<H> allHandles();
282     
283     /**
284      * Register a new client socket for connection, add it to connection polling
285      * @param handle client socket handle 
286      * @param request the associated {@link ConnectionRequest}
287      * @throws Exception any exception thrown by the underlying systems calls
288      */
289     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
290     
291     /**
292      * get the {@link ConnectionRequest} for a given client socket handle
293      * @param handle the socket client handle 
294      * @return the connection request if the socket is connecting otherwise <code>null</code>
295      */
296     protected abstract ConnectionRequest getConnectionRequest(H handle);
297 
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     protected final IoFuture dispose0() throws Exception {
303         if (!disposalFuture.isDone()) {
304             startupWorker();
305             wakeup();
306         }
307         return disposalFuture;
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     @SuppressWarnings("unchecked")
315     protected final ConnectFuture connect0(
316             SocketAddress remoteAddress, SocketAddress localAddress,
317             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
318         H handle = null;
319         boolean success = false;
320         try {
321             handle = newHandle(localAddress);
322             if (connect(handle, remoteAddress)) {
323                 ConnectFuture future = new DefaultConnectFuture();
324                 T session = newSession(processor, handle);
325                 initSession(session, future, sessionInitializer);
326                 // Forward the remaining process to the IoProcessor.
327                 session.getProcessor().add(session);
328                 success = true;
329                 return future;
330             }
331 
332             success = true;
333         } catch (Exception e) {
334             return DefaultConnectFuture.newFailedFuture(e);
335         } finally {
336             if (!success && handle != null) {
337                 try {
338                     close(handle);
339                 } catch (Exception e) {
340                     ExceptionMonitor.getInstance().exceptionCaught(e);
341                 }
342             }
343         }
344 
345         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
346         connectQueue.add(request);
347         startupWorker();
348         wakeup();
349 
350         return request;
351     }
352 
353     private void startupWorker() {
354         if (!selectable) {
355             connectQueue.clear();
356             cancelQueue.clear();
357         }
358 
359         synchronized (lock) {
360             if (connector == null) {
361                 connector = new Connector();
362                 executeWorker(connector);
363             }
364         }
365     }
366 
367     private int registerNew() {
368         int nHandles = 0;
369         for (; ;) {
370             ConnectionRequest req = connectQueue.poll();
371             if (req == null) {
372                 break;
373             }
374 
375             H handle = req.handle;
376             try {
377                 register(handle, req);
378                 nHandles ++;
379             } catch (Exception e) {
380                 req.setException(e);
381                 try {
382                     close(handle);
383                 } catch (Exception e2) {
384                     ExceptionMonitor.getInstance().exceptionCaught(e2);
385                 }
386             }
387         }
388         return nHandles;
389     }
390 
391     private int cancelKeys() {
392         int nHandles = 0;
393         for (; ;) {
394             ConnectionRequest req = cancelQueue.poll();
395             if (req == null) {
396                 break;
397             }
398 
399             H handle = req.handle;
400             try {
401                 close(handle);
402             } catch (Exception e) {
403                 ExceptionMonitor.getInstance().exceptionCaught(e);
404             } finally {
405                 nHandles ++;
406             }
407         }
408         return nHandles;
409     }
410 
411     /**
412      * Process the incoming connections, creating a new session for each
413      * valid connection. 
414      */
415     private int processConnections(Iterator<H> handlers) {
416         int nHandles = 0;
417         
418         // Loop on each connection request
419         while (handlers.hasNext()) {
420             H handle = handlers.next();
421             handlers.remove();
422 
423             ConnectionRequest connectionRequest = getConnectionRequest(handle);
424             
425             if ( connectionRequest == null) {
426                 continue;
427             }
428             
429             boolean success = false;
430             try {
431                 if (finishConnect(handle)) {
432                     T session = newSession(processor, handle);
433                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
434                     // Forward the remaining process to the IoProcessor.
435                     session.getProcessor().add(session);
436                     nHandles ++;
437                 }
438                 success = true;
439             } catch (Throwable e) {
440                 connectionRequest.setException(e);
441             } finally {
442                 if (!success) {
443                     // The connection failed, we have to cancel it.
444                     cancelQueue.offer(connectionRequest);
445                 }
446             }
447         }
448         return nHandles;
449     }
450 
451     private void processTimedOutSessions(Iterator<H> handles) {
452         long currentTime = System.currentTimeMillis();
453 
454         while (handles.hasNext()) {
455             H handle = handles.next();
456             ConnectionRequest connectionRequest = getConnectionRequest(handle);
457 
458             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
459                 connectionRequest.setException(
460                         new ConnectException("Connection timed out."));
461                 cancelQueue.offer(connectionRequest);
462             }
463         }
464     }
465 
466     private class Connector implements Runnable {
467 
468         public void run() {
469             int nHandles = 0;
470             while (selectable) {
471                 try {
472                     // the timeout for select shall be smaller of the connect
473                     // timeout or 1 second...
474                     int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);
475                     int selected = select(timeout);
476 
477                     nHandles += registerNew();
478 
479                     if (selected > 0) {
480                         nHandles -= processConnections(selectedHandles());
481                     }
482 
483                     processTimedOutSessions(allHandles());
484 
485                     nHandles -= cancelKeys();
486 
487                     if (nHandles == 0) {
488                         synchronized (lock) {
489                             if (connectQueue.isEmpty()) {
490                                 connector = null;
491                                 break;
492                             }
493                         }
494                     }
495                 } catch (Throwable e) {
496                     ExceptionMonitor.getInstance().exceptionCaught(e);
497 
498                     try {
499                         Thread.sleep(1000);
500                     } catch (InterruptedException e1) {
501                         ExceptionMonitor.getInstance().exceptionCaught(e1);
502                     }
503                 }
504             }
505 
506             if (selectable && isDisposing()) {
507                 selectable = false;
508                 try {
509                     if (createdProcessor) {
510                         processor.dispose();
511                     }
512                 } finally {
513                     try {
514                         synchronized (disposalLock) {
515                             if (isDisposing()) {
516                                 destroy();
517                             }
518                         }
519                     } catch (Exception e) {
520                         ExceptionMonitor.getInstance().exceptionCaught(e);
521                     } finally {
522                         disposalFuture.setDone();
523                     }
524                 }
525             }
526         }
527     }
528 
529     public final class ConnectionRequest extends DefaultConnectFuture {
530         private final H handle;
531         private final long deadline;
532         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
533 
534         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
535             this.handle = handle;
536             long timeout = getConnectTimeoutMillis();
537             if (timeout <= 0L) {
538                 this.deadline = Long.MAX_VALUE;
539             } else {
540                 this.deadline = System.currentTimeMillis() + timeout;
541             }
542             this.sessionInitializer = callback;
543         }
544 
545         public H getHandle() {
546             return handle;
547         }
548 
549         public long getDeadline() {
550             return deadline;
551         }
552 
553         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
554             return sessionInitializer;
555         }
556 
557         @Override
558         public void cancel() {
559             if ( !isDone() ) {
560                 super.cancel();
561                 cancelQueue.add(this);
562                 startupWorker();
563                 wakeup();
564             }
565         }
566     }
567 }