View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.util.Iterator;
25  import java.util.Queue;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.Executors;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.filterchain.IoFilter;
32  import org.apache.mina.core.future.ConnectFuture;
33  import org.apache.mina.core.future.DefaultConnectFuture;
34  import org.apache.mina.core.future.IoFuture;
35  import org.apache.mina.core.service.AbstractIoConnector;
36  import org.apache.mina.core.service.IoConnector;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.SimpleIoProcessorPool;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.session.IoSessionInitializer;
44  import org.apache.mina.util.ExceptionMonitor;
45  
46  /**
47   * A base class for implementing client transport using a polling strategy. The
48   * underlying sockets will be checked in an active loop and woke up when an
49   * socket needed to be processed. This class handle the logic behind binding,
50   * connecting and disposing the client sockets. A {@link Executor} will be used
51   * for running client connection, and an {@link AbstractPollingIoProcessor} will
52   * be used for processing connected client I/O operations like reading, writing
53   * and closing.
54   * 
55   * All the low level methods for binding, connecting, closing need to be
56   * provided by the subclassing implementation.
57   * 
58   * @see NioSocketConnector for a example of implementation
59   * 
60   * @author The Apache MINA Project (dev@mina.apache.org)
61   * @version $Rev: 678335 $, $Date: 2008-06-28 23:27:00 +0200 (sam., 28 juin
62   *          2008) $
63   */
64  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H>
65          extends AbstractIoConnector {
66  
67      private final Object lock = new Object();
68      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
70      private final IoProcessor<T> processor;
71      private final boolean createdProcessor;
72  
73      private final ServiceOperationFuture disposalFuture =
74          new ServiceOperationFuture();
75      private volatile boolean selectable;
76      private Worker worker;
77  
78      /**
79       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
80       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
81       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
82       * pool size will be used.
83       * 
84       * @see SimpleIoProcessorPool
85       * 
86       * @param sessionConfig
87       *            the default configuration for the managed {@link IoSession}
88       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
89       *            type.
90       */
91      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
92          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
93      }
94  
95      /**
96       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
97       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
98       * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
99       * systems.
100      * 
101      * @see SimpleIoProcessorPool
102      * 
103      * @param sessionConfig
104      *            the default configuration for the managed {@link IoSession}
105      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
106      *            type.
107      * @param processorCount the amount of processor to instantiate for the pool
108      */
109     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
110         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
111     }
112 
113     /**
114      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
115      * session configuration, a default {@link Executor} will be created using
116      * {@link Executors#newCachedThreadPool()}.
117      * 
118      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
119      * 
120      * @param sessionConfig
121      *            the default configuration for the managed {@link IoSession}
122      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
123      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
124      */
125     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
126         this(sessionConfig, null, processor, false);
127     }
128 
129     /**
130      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
131      * session configuration and an {@link Executor} for handling I/O events. If
132      * null {@link Executor} is provided, a default one will be created using
133      * {@link Executors#newCachedThreadPool()}.
134      * 
135      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
136      * 
137      * @param sessionConfig
138      *            the default configuration for the managed {@link IoSession}
139      * @param executor
140      *            the {@link Executor} used for handling asynchronous execution of I/O
141      *            events. Can be <code>null</code>.
142      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
143      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
144      */
145     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
146         this(sessionConfig, executor, processor, false);
147     }
148 
149     /**
150      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
151      * session configuration and an {@link Executor} for handling I/O events. If
152      * null {@link Executor} is provided, a default one will be created using
153      * {@link Executors#newCachedThreadPool()}.
154      * 
155      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
156      * 
157      * @param sessionConfig
158      *            the default configuration for the managed {@link IoSession}
159      * @param executor
160      *            the {@link Executor} used for handling asynchronous execution of I/O
161      *            events. Can be <code>null</code>.
162      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
163      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
164      * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed 
165      */
166     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
167         super(sessionConfig, executor);
168 
169         if (processor == null) {
170             throw new NullPointerException("processor");
171         }
172 
173         this.processor = processor;
174         this.createdProcessor = createdProcessor;
175 
176         try {
177             init();
178             selectable = true;
179         } catch (RuntimeException e){
180             throw e;
181         } catch (Exception e) {
182             throw new RuntimeIoException("Failed to initialize.", e);
183         } finally {
184             if (!selectable) {
185                 try {
186                     destroy();
187                 } catch (Exception e) {
188                     ExceptionMonitor.getInstance().exceptionCaught(e);
189                 }
190             }
191         }
192     }
193 
194     /**
195      * Initialize the polling system, will be called at construction time.
196      * @throws Exception any exception thrown by the underlying system calls  
197      */
198     protected abstract void init() throws Exception;
199 
200     /**
201      * Destroy the polling system, will be called when this {@link IoConnector}
202      * implementation will be disposed.  
203      * @throws Exception any exception thrown by the underlying systems calls
204      */
205     protected abstract void destroy() throws Exception;
206     
207     /**
208      * Create a new client socket handle from a local {@link SocketAddress}
209      * @param localAddress the socket address for binding the new client socket 
210      * @return a new client socket handle 
211      * @throws Exception any exception thrown by the underlying systems calls
212      */
213     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
214     
215     /**
216      * Connect a newly created client socket handle to a remote {@link SocketAddress}.
217      * This operation is non-blocking, so at end of the call the socket can be still in connection
218      * process.
219      * @param handle the client socket handle
220      * @param remoteAddress the remote address where to connect
221      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket 
222      *         is in non-blocking mode and the connection operation is in progress
223      * @throws Exception
224      */
225     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
226     
227     /**
228      * Finish the connection process of a client socket after it was marked as ready to process
229      * by the {@link #select(int)} call. The socket will be connected or reported as connection
230      * failed.
231      * @param handle the client socket handle to finsh to connect
232      * @return true if the socket is connected
233      * @throws Exception any exception thrown by the underlying systems calls
234      */
235     protected abstract boolean finishConnect(H handle) throws Exception;
236     
237     /**
238      * Create a new {@link IoSession} from a connected socket client handle.
239      * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
240      * managing future I/O events.
241      * @param processor the processor in charge of this session
242      * @param handle the newly connected client socket handle
243      * @return a new {@link IoSession}
244      * @throws Exception any exception thrown by the underlying systems calls
245      */
246     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
247 
248     /**
249      * Close a client socket.
250      * @param handle the client socket
251      * @throws Exception any exception thrown by the underlying systems calls
252      */
253     protected abstract void close(H handle) throws Exception;
254     
255     /**
256      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
257      */
258     protected abstract void wakeup();
259     
260     /**
261      * Check for connected sockets, interrupt when at least a connection is processed (connected or
262      * failed to connect). All the client socket descriptors processed need to be returned by 
263      * {@link #selectedHandles()}
264      * @return true if one server socket have got incoming client
265      * @throws Exception any exception thrown by the underlying systems calls
266      */
267     protected abstract boolean select(int timeout) throws Exception;
268     
269     /**
270      * {@link Iterator} for the set of client sockets found connected or 
271      * failed to connect during the last {@link #select()} call.
272      * @return the list of client socket handles to process
273      */
274     protected abstract Iterator<H> selectedHandles();
275     
276     /**
277      * {@link Iterator} for all the client sockets polled for connection.
278      * @return the list of client sockets currently polled for connection
279      */
280     protected abstract Iterator<H> allHandles();
281     
282     /**
283      * Register a new client socket for connection, add it to connection polling
284      * @param handle client socket handle 
285      * @param request the associated {@link ConnectionRequest}
286      * @throws Exception any exception thrown by the underlying systems calls
287      */
288     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
289     
290     /**
291      * get the {@link ConnectionRequest} for a given client socket handle
292      * @param handle the socket client handle 
293      * @return the connection request if the socket is connecting otherwise <code>null</code>
294      */
295     protected abstract ConnectionRequest connectionRequest(H handle);
296 
297     /**
298      * {@inheritDoc}
299      */
300     @Override
301     protected final IoFuture dispose0() throws Exception {
302         if (!disposalFuture.isDone()) {
303             startupWorker();
304             wakeup();
305         }
306         return disposalFuture;
307     }
308 
309     /**
310      * {@inheritDoc}
311      */
312     @Override
313     @SuppressWarnings("unchecked")
314     protected final ConnectFuture connect0(
315             SocketAddress remoteAddress, SocketAddress localAddress,
316             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
317         H handle = null;
318         boolean success = false;
319         try {
320             handle = newHandle(localAddress);
321             if (connect(handle, remoteAddress)) {
322                 ConnectFuture future = new DefaultConnectFuture();
323                 T session = newSession(processor, handle);
324                 finishSessionInitialization(session, future, sessionInitializer);
325                 // Forward the remaining process to the IoProcessor.
326                 session.getProcessor().add(session);
327                 success = true;
328                 return future;
329             }
330 
331             success = true;
332         } catch (Exception e) {
333             return DefaultConnectFuture.newFailedFuture(e);
334         } finally {
335             if (!success && handle != null) {
336                 try {
337                     close(handle);
338                 } catch (Exception e) {
339                     ExceptionMonitor.getInstance().exceptionCaught(e);
340                 }
341             }
342         }
343 
344         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
345         connectQueue.add(request);
346         startupWorker();
347         wakeup();
348 
349         return request;
350     }
351 
352     private void startupWorker() {
353         if (!selectable) {
354             connectQueue.clear();
355             cancelQueue.clear();
356         }
357 
358         synchronized (lock) {
359             if (worker == null) {
360                 worker = new Worker();
361                 executeWorker(worker);
362             }
363         }
364     }
365 
366     private int registerNew() {
367         int nHandles = 0;
368         for (; ;) {
369             ConnectionRequest req = connectQueue.poll();
370             if (req == null) {
371                 break;
372             }
373 
374             H handle = req.handle;
375             try {
376                 register(handle, req);
377                 nHandles ++;
378             } catch (Exception e) {
379                 req.setException(e);
380                 try {
381                     close(handle);
382                 } catch (Exception e2) {
383                     ExceptionMonitor.getInstance().exceptionCaught(e2);
384                 }
385             }
386         }
387         return nHandles;
388     }
389 
390     private int cancelKeys() {
391         int nHandles = 0;
392         for (; ;) {
393             ConnectionRequest req = cancelQueue.poll();
394             if (req == null) {
395                 break;
396             }
397 
398             H handle = req.handle;
399             try {
400                 close(handle);
401             } catch (Exception e) {
402                 ExceptionMonitor.getInstance().exceptionCaught(e);
403             } finally {
404                 nHandles ++;
405             }
406         }
407         return nHandles;
408     }
409 
410     @SuppressWarnings("unchecked")
411     private int processSessions(Iterator<H> handlers) {
412         int nHandles = 0;
413         while (handlers.hasNext()) {
414             H handle = handlers.next();
415             handlers.remove();
416 
417             ConnectionRequest entry = connectionRequest(handle);
418             boolean success = false;
419             try {
420                 if (finishConnect(handle)) {
421                     T session = newSession(processor, handle);
422                     finishSessionInitialization(session, entry, entry.getSessionInitializer());
423                     // Forward the remaining process to the IoProcessor.
424                     session.getProcessor().add(session);
425                     nHandles ++;
426                 }
427                 success = true;
428             } catch (Throwable e) {
429                 entry.setException(e);
430             } finally {
431                 if (!success) {
432                     cancelQueue.offer(entry);
433                 }
434             }
435         }
436         return nHandles;
437     }
438 
439     private void processTimedOutSessions(Iterator<H> handles) {
440         long currentTime = System.currentTimeMillis();
441 
442         while (handles.hasNext()) {
443             H handle = handles.next();
444             ConnectionRequest entry = connectionRequest(handle);
445 
446             if (currentTime >= entry.deadline) {
447                 entry.setException(
448                         new ConnectException("Connection timed out."));
449                 cancelQueue.offer(entry);
450             }
451         }
452     }
453 
454     private class Worker implements Runnable {
455 
456         public void run() {
457             int nHandles = 0;
458             while (selectable) {
459                 try {
460                     // the timeout for select shall be smaller of the connect
461                     // timeout or 1 second...
462                     int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);
463                     boolean selected = select(timeout);
464 
465                     nHandles += registerNew();
466 
467                     if (selected) {
468                         nHandles -= processSessions(selectedHandles());
469                     }
470 
471                     processTimedOutSessions(allHandles());
472 
473                     nHandles -= cancelKeys();
474 
475                     if (nHandles == 0) {
476                         synchronized (lock) {
477                             if (connectQueue.isEmpty()) {
478                                 worker = null;
479                                 break;
480                             }
481                         }
482                     }
483                 } catch (Throwable e) {
484                     ExceptionMonitor.getInstance().exceptionCaught(e);
485 
486                     try {
487                         Thread.sleep(1000);
488                     } catch (InterruptedException e1) {
489                         ExceptionMonitor.getInstance().exceptionCaught(e1);
490                     }
491                 }
492             }
493 
494             if (selectable && isDisposing()) {
495                 selectable = false;
496                 try {
497                     if (createdProcessor) {
498                         processor.dispose();
499                     }
500                 } finally {
501                     try {
502                         synchronized (disposalLock) {
503                             if (isDisposing()) {
504                                 destroy();
505                             }
506                         }
507                     } catch (Exception e) {
508                         ExceptionMonitor.getInstance().exceptionCaught(e);
509                     } finally {
510                         disposalFuture.setDone();
511                     }
512                 }
513             }
514         }
515     }
516 
517     public final class ConnectionRequest extends DefaultConnectFuture {
518         private final H handle;
519         private final long deadline;
520         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
521 
522         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
523             this.handle = handle;
524             long timeout = getConnectTimeoutMillis();
525             if (timeout <= 0L) {
526                 this.deadline = Long.MAX_VALUE;
527             } else {
528                 this.deadline = System.currentTimeMillis() + timeout;
529             }
530             this.sessionInitializer = callback;
531         }
532 
533         public H getHandle() {
534             return handle;
535         }
536 
537         public long getDeadline() {
538             return deadline;
539         }
540 
541         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
542             return sessionInitializer;
543         }
544 
545         @Override
546         public void cancel() {
547             super.cancel();
548             cancelQueue.add(this);
549             startupWorker();
550             wakeup();
551         }
552     }
553 }