001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.polling;
021
022import java.net.ConnectException;
023import java.net.SocketAddress;
024import java.nio.channels.ClosedSelectorException;
025import java.util.Iterator;
026import java.util.Queue;
027import java.util.concurrent.ConcurrentLinkedQueue;
028import java.util.concurrent.Executor;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicReference;
031
032import org.apache.mina.core.RuntimeIoException;
033import org.apache.mina.core.filterchain.IoFilter;
034import org.apache.mina.core.future.ConnectFuture;
035import org.apache.mina.core.future.DefaultConnectFuture;
036import org.apache.mina.core.service.AbstractIoConnector;
037import org.apache.mina.core.service.AbstractIoService;
038import org.apache.mina.core.service.IoConnector;
039import org.apache.mina.core.service.IoHandler;
040import org.apache.mina.core.service.IoProcessor;
041import org.apache.mina.core.service.SimpleIoProcessorPool;
042import org.apache.mina.core.session.AbstractIoSession;
043import org.apache.mina.core.session.IoSession;
044import org.apache.mina.core.session.IoSessionConfig;
045import org.apache.mina.core.session.IoSessionInitializer;
046import org.apache.mina.transport.socket.nio.NioSocketConnector;
047import org.apache.mina.util.ExceptionMonitor;
048
049/**
050 * A base class for implementing client transport using a polling strategy. The
051 * underlying sockets will be checked in an active loop and woke up when an
052 * socket needed to be processed. This class handle the logic behind binding,
053 * connecting and disposing the client sockets. A {@link Executor} will be used
054 * for running client connection, and an {@link AbstractPollingIoProcessor} will
055 * be used for processing connected client I/O operations like reading, writing
056 * and closing.
057 * 
058 * All the low level methods for binding, connecting, closing need to be
059 * provided by the subclassing implementation.
060 * 
061 * @see NioSocketConnector for a example of implementation
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 */
065public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
066
067    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
068
069    private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
070
071    private final IoProcessor<T> processor;
072
073    private final boolean createdProcessor;
074
075    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
076
077    private volatile boolean selectable;
078
079    /** The connector thread */
080    private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
081
082    /**
083     * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
084     * default session configuration, a class of {@link IoProcessor} which will
085     * be instantiated in a {@link SimpleIoProcessorPool} for better scaling in
086     * multiprocessor systems. The default pool size will be used.
087     * 
088     * @see SimpleIoProcessorPool
089     * 
090     * @param sessionConfig
091     *            the default configuration for the managed {@link IoSession}
092     * @param processorClass
093     *            a {@link Class} of {@link IoProcessor} for the associated
094     *            {@link IoSession} type.
095     */
096    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
097        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
098    }
099
100    /**
101     * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
102     * default session configuration, a class of {@link IoProcessor} which will
103     * be instantiated in a {@link SimpleIoProcessorPool} for using multiple
104     * thread for better scaling in multiprocessor systems.
105     * 
106     * @see SimpleIoProcessorPool
107     * 
108     * @param sessionConfig
109     *            the default configuration for the managed {@link IoSession}
110     * @param processorClass
111     *            a {@link Class} of {@link IoProcessor} for the associated
112     *            {@link IoSession} type.
113     * @param processorCount
114     *            the amount of processor to instantiate for the pool
115     */
116    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
117            int processorCount) {
118        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
119    }
120
121    /**
122     * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
123     * default session configuration, a default {@link Executor} will be created
124     * using {@link Executors#newCachedThreadPool()}.
125     * 
126     * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
127     * 
128     * @param sessionConfig
129     *            the default configuration for the managed {@link IoSession}
130     * @param processor
131     *            the {@link IoProcessor} for processing the {@link IoSession}
132     *            of this transport, triggering events to the bound
133     *            {@link IoHandler} and processing the chains of
134     *            {@link IoFilter}
135     */
136    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
137        this(sessionConfig, null, processor, false);
138    }
139
140    /**
141     * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
142     * default session configuration and an {@link Executor} for handling I/O
143     * events. If null {@link Executor} is provided, a default one will be
144     * created using {@link Executors#newCachedThreadPool()}.
145     * 
146     * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
147     * 
148     * @param sessionConfig
149     *            the default configuration for the managed {@link IoSession}
150     * @param executor
151     *            the {@link Executor} used for handling asynchronous execution
152     *            of I/O events. Can be <code>null</code>.
153     * @param processor
154     *            the {@link IoProcessor} for processing the {@link IoSession}
155     *            of this transport, triggering events to the bound
156     *            {@link IoHandler} and processing the chains of
157     *            {@link IoFilter}
158     */
159    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
160        this(sessionConfig, executor, processor, false);
161    }
162
163    /**
164     * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
165     * default session configuration and an {@link Executor} for handling I/O
166     * events. If null {@link Executor} is provided, a default one will be
167     * created using {@link Executors#newCachedThreadPool()}.
168     * 
169     * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
170     * 
171     * @param sessionConfig
172     *            the default configuration for the managed {@link IoSession}
173     * @param executor
174     *            the {@link Executor} used for handling asynchronous execution
175     *            of I/O events. Can be <code>null</code>.
176     * @param processor
177     *            the {@link IoProcessor} for processing the {@link IoSession}
178     *            of this transport, triggering events to the bound
179     *            {@link IoHandler} and processing the chains of
180     *            {@link IoFilter}
181     * @param createdProcessor
182     *            tagging the processor as automatically created, so it will be
183     *            automatically disposed
184     */
185    private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
186            boolean createdProcessor) {
187        super(sessionConfig, executor);
188
189        if (processor == null) {
190            throw new IllegalArgumentException("processor");
191        }
192
193        this.processor = processor;
194        this.createdProcessor = createdProcessor;
195
196        try {
197            init();
198            selectable = true;
199        } catch (RuntimeException e) {
200            throw e;
201        } catch (Exception e) {
202            throw new RuntimeIoException("Failed to initialize.", e);
203        } finally {
204            if (!selectable) {
205                try {
206                    destroy();
207                } catch (Exception e) {
208                    ExceptionMonitor.getInstance().exceptionCaught(e);
209                }
210            }
211        }
212    }
213
214    /**
215     * Initialize the polling system, will be called at construction time.
216     * 
217     * @throws Exception
218     *             any exception thrown by the underlying system calls
219     */
220    protected abstract void init() throws Exception;
221
222    /**
223     * Destroy the polling system, will be called when this {@link IoConnector}
224     * implementation will be disposed.
225     * 
226     * @throws Exception
227     *             any exception thrown by the underlying systems calls
228     */
229    protected abstract void destroy() throws Exception;
230
231    /**
232     * Create a new client socket handle from a local {@link SocketAddress}
233     * 
234     * @param localAddress
235     *            the socket address for binding the new client socket
236     * @return a new client socket handle
237     * @throws Exception
238     *             any exception thrown by the underlying systems calls
239     */
240    protected abstract H newHandle(SocketAddress localAddress) throws Exception;
241
242    /**
243     * Connect a newly created client socket handle to a remote
244     * {@link SocketAddress}. This operation is non-blocking, so at end of the
245     * call the socket can be still in connection process.
246     * 
247     * @param handle the client socket handle
248     * @param remoteAddress the remote address where to connect
249     * @return <tt>true</tt> if a connection was established, <tt>false</tt> if
250     *         this client socket is in non-blocking mode and the connection
251     *         operation is in progress
252     * @throws Exception If the connect failed
253     */
254    protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
255
256    /**
257     * Finish the connection process of a client socket after it was marked as
258     * ready to process by the {@link #select(int)} call. The socket will be
259     * connected or reported as connection failed.
260     * 
261     * @param handle
262     *            the client socket handle to finish to connect
263     * @return true if the socket is connected
264     * @throws Exception
265     *             any exception thrown by the underlying systems calls
266     */
267    protected abstract boolean finishConnect(H handle) throws Exception;
268
269    /**
270     * Create a new {@link IoSession} from a connected socket client handle.
271     * Will assign the created {@link IoSession} to the given
272     * {@link IoProcessor} for managing future I/O events.
273     * 
274     * @param processor
275     *            the processor in charge of this session
276     * @param handle
277     *            the newly connected client socket handle
278     * @return a new {@link IoSession}
279     * @throws Exception
280     *             any exception thrown by the underlying systems calls
281     */
282    protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
283
284    /**
285     * Close a client socket.
286     * 
287     * @param handle
288     *            the client socket
289     * @throws Exception
290     *             any exception thrown by the underlying systems calls
291     */
292    protected abstract void close(H handle) throws Exception;
293
294    /**
295     * Interrupt the {@link #select(int)} method. Used when the poll set need to
296     * be modified.
297     */
298    protected abstract void wakeup();
299
300    /**
301     * Check for connected sockets, interrupt when at least a connection is
302     * processed (connected or failed to connect). All the client socket
303     * descriptors processed need to be returned by {@link #selectedHandles()}
304     * 
305     * @param timeout The timeout for the select() method
306     * @return The number of socket having received some data
307     * @throws Exception any exception thrown by the underlying systems calls
308     */
309    protected abstract int select(int timeout) throws Exception;
310
311    /**
312     * {@link Iterator} for the set of client sockets found connected or failed
313     * to connect during the last {@link #select(int)} call.
314     * 
315     * @return the list of client socket handles to process
316     */
317    protected abstract Iterator<H> selectedHandles();
318
319    /**
320     * {@link Iterator} for all the client sockets polled for connection.
321     * 
322     * @return the list of client sockets currently polled for connection
323     */
324    protected abstract Iterator<H> allHandles();
325
326    /**
327     * Register a new client socket for connection, add it to connection polling
328     * 
329     * @param handle
330     *            client socket handle
331     * @param request
332     *            the associated {@link ConnectionRequest}
333     * @throws Exception
334     *             any exception thrown by the underlying systems calls
335     */
336    protected abstract void register(H handle, ConnectionRequest request) throws Exception;
337
338    /**
339     * get the {@link ConnectionRequest} for a given client socket handle
340     * 
341     * @param handle
342     *            the socket client handle
343     * @return the connection request if the socket is connecting otherwise
344     *         <code>null</code>
345     */
346    protected abstract ConnectionRequest getConnectionRequest(H handle);
347
348    /**
349     * {@inheritDoc}
350     */
351    @Override
352    protected final void dispose0() throws Exception {
353        startupWorker();
354        wakeup();
355    }
356
357    /**
358     * {@inheritDoc}
359     */
360    @Override
361    @SuppressWarnings("unchecked")
362    protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
363            IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
364        H handle = null;
365        boolean success = false;
366        try {
367            handle = newHandle(localAddress);
368            if (connect(handle, remoteAddress)) {
369                ConnectFuture future = new DefaultConnectFuture();
370                T session = newSession(processor, handle);
371                initSession(session, future, sessionInitializer);
372                // Forward the remaining process to the IoProcessor.
373                session.getProcessor().add(session);
374                success = true;
375                return future;
376            }
377
378            success = true;
379        } catch (Exception e) {
380            return DefaultConnectFuture.newFailedFuture(e);
381        } finally {
382            if (!success && handle != null) {
383                try {
384                    close(handle);
385                } catch (Exception e) {
386                    ExceptionMonitor.getInstance().exceptionCaught(e);
387                }
388            }
389        }
390
391        ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
392        connectQueue.add(request);
393        startupWorker();
394        wakeup();
395
396        return request;
397    }
398
399    private void startupWorker() {
400        if (!selectable) {
401            connectQueue.clear();
402            cancelQueue.clear();
403        }
404
405        Connector connector = connectorRef.get();
406
407        if (connector == null) {
408            connector = new Connector();
409
410            if (connectorRef.compareAndSet(null, connector)) {
411                executeWorker(connector);
412            }
413        }
414    }
415
416    private int registerNew() {
417        int nHandles = 0;
418        for (;;) {
419            ConnectionRequest req = connectQueue.poll();
420            if (req == null) {
421                break;
422            }
423
424            H handle = req.handle;
425            try {
426                register(handle, req);
427                nHandles++;
428            } catch (Exception e) {
429                req.setException(e);
430                try {
431                    close(handle);
432                } catch (Exception e2) {
433                    ExceptionMonitor.getInstance().exceptionCaught(e2);
434                }
435            }
436        }
437        return nHandles;
438    }
439
440    private int cancelKeys() {
441        int nHandles = 0;
442
443        for (;;) {
444            ConnectionRequest req = cancelQueue.poll();
445
446            if (req == null) {
447                break;
448            }
449
450            H handle = req.handle;
451
452            try {
453                close(handle);
454            } catch (Exception e) {
455                ExceptionMonitor.getInstance().exceptionCaught(e);
456            } finally {
457                nHandles++;
458            }
459        }
460
461        if (nHandles > 0) {
462            wakeup();
463        }
464
465        return nHandles;
466    }
467
468    /**
469     * Process the incoming connections, creating a new session for each valid
470     * connection.
471     */
472    private int processConnections(Iterator<H> handlers) {
473        int nHandles = 0;
474
475        // Loop on each connection request
476        while (handlers.hasNext()) {
477            H handle = handlers.next();
478            handlers.remove();
479
480            ConnectionRequest connectionRequest = getConnectionRequest(handle);
481
482            if (connectionRequest == null) {
483                continue;
484            }
485
486            boolean success = false;
487            try {
488                if (finishConnect(handle)) {
489                    T session = newSession(processor, handle);
490                    initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
491                    // Forward the remaining process to the IoProcessor.
492                    session.getProcessor().add(session);
493                    nHandles++;
494                }
495                success = true;
496            } catch (Exception e) {
497                connectionRequest.setException(e);
498            } finally {
499                if (!success) {
500                    // The connection failed, we have to cancel it.
501                    cancelQueue.offer(connectionRequest);
502                }
503            }
504        }
505        return nHandles;
506    }
507
508    private void processTimedOutSessions(Iterator<H> handles) {
509        long currentTime = System.currentTimeMillis();
510
511        while (handles.hasNext()) {
512            H handle = handles.next();
513            ConnectionRequest connectionRequest = getConnectionRequest(handle);
514
515            if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
516                connectionRequest.setException(new ConnectException("Connection timed out."));
517                cancelQueue.offer(connectionRequest);
518            }
519        }
520    }
521
522    private class Connector implements Runnable {
523
524        public void run() {
525            assert (connectorRef.get() == this);
526
527            int nHandles = 0;
528
529            while (selectable) {
530                try {
531                    // the timeout for select shall be smaller of the connect
532                    // timeout or 1 second...
533                    int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
534                    int selected = select(timeout);
535
536                    nHandles += registerNew();
537
538                    // get a chance to get out of the connector loop, if we
539                    // don't have any more handles
540                    if (nHandles == 0) {
541                        connectorRef.set(null);
542
543                        if (connectQueue.isEmpty()) {
544                            assert (connectorRef.get() != this);
545                            break;
546                        }
547
548                        if (!connectorRef.compareAndSet(null, this)) {
549                            assert (connectorRef.get() != this);
550                            break;
551                        }
552
553                        assert (connectorRef.get() == this);
554                    }
555
556                    if (selected > 0) {
557                        nHandles -= processConnections(selectedHandles());
558                    }
559
560                    processTimedOutSessions(allHandles());
561
562                    nHandles -= cancelKeys();
563                } catch (ClosedSelectorException cse) {
564                    // If the selector has been closed, we can exit the loop
565                    ExceptionMonitor.getInstance().exceptionCaught(cse);
566                    break;
567                } catch (Exception e) {
568                    ExceptionMonitor.getInstance().exceptionCaught(e);
569
570                    try {
571                        Thread.sleep(1000);
572                    } catch (InterruptedException e1) {
573                        ExceptionMonitor.getInstance().exceptionCaught(e1);
574                    }
575                }
576            }
577
578            if (selectable && isDisposing()) {
579                selectable = false;
580                try {
581                    if (createdProcessor) {
582                        processor.dispose();
583                    }
584                } finally {
585                    try {
586                        synchronized (disposalLock) {
587                            if (isDisposing()) {
588                                destroy();
589                            }
590                        }
591                    } catch (Exception e) {
592                        ExceptionMonitor.getInstance().exceptionCaught(e);
593                    } finally {
594                        disposalFuture.setDone();
595                    }
596                }
597            }
598        }
599    }
600
601    public final class ConnectionRequest extends DefaultConnectFuture {
602        /** The handle associated with this connection request */
603        private final H handle;
604
605        /** The time up to this connection request will be valid */
606        private final long deadline;
607
608        /** The callback to call when the session is initialized */
609        private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
610
611        public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
612            this.handle = handle;
613            long timeout = getConnectTimeoutMillis();
614
615            if (timeout <= 0L) {
616                this.deadline = Long.MAX_VALUE;
617            } else {
618                this.deadline = System.currentTimeMillis() + timeout;
619            }
620
621            this.sessionInitializer = callback;
622        }
623
624        public H getHandle() {
625            return handle;
626        }
627
628        public long getDeadline() {
629            return deadline;
630        }
631
632        public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
633            return sessionInitializer;
634        }
635
636        @Override
637        public boolean cancel() {
638            if (!isDone()) {
639                boolean justCancelled = super.cancel();
640
641                // We haven't cancelled the request before, so add the future
642                // in the cancel queue.
643                if (justCancelled) {
644                    cancelQueue.add(this);
645                    startupWorker();
646                    wakeup();
647                }
648            }
649
650            return true;
651        }
652    }
653}