001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.nio;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.net.ServerSocket;
025import java.net.SocketAddress;
026import java.nio.channels.ClosedSelectorException;
027import java.nio.channels.SelectionKey;
028import java.nio.channels.Selector;
029import java.nio.channels.ServerSocketChannel;
030import java.nio.channels.SocketChannel;
031import java.nio.channels.spi.SelectorProvider;
032import java.util.Collection;
033import java.util.Iterator;
034import java.util.concurrent.Executor;
035
036import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
037import org.apache.mina.core.service.IoAcceptor;
038import org.apache.mina.core.service.IoProcessor;
039import org.apache.mina.core.service.IoService;
040import org.apache.mina.core.service.SimpleIoProcessorPool;
041import org.apache.mina.core.service.TransportMetadata;
042import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
043import org.apache.mina.transport.socket.SocketAcceptor;
044
045/**
046 * {@link IoAcceptor} for socket transport (TCP/IP).  This class
047 * handles incoming TCP/IP based socket connections.
048 *
049 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
050 */
051public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
052implements SocketAcceptor {
053
054    private volatile Selector selector;
055    private volatile SelectorProvider selectorProvider = null;
056
057    /**
058     * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
059     */
060    public NioSocketAcceptor() {
061        super(new DefaultSocketSessionConfig(), NioProcessor.class);
062        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
063    }
064
065    /**
066     * Constructor for {@link NioSocketAcceptor} using default parameters, and
067     * given number of {@link NioProcessor} for multithreading I/O operations.
068     * 
069     * @param processorCount the number of processor to create and place in a
070     * {@link SimpleIoProcessorPool}
071     */
072    public NioSocketAcceptor(int processorCount) {
073        super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
074        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
075    }
076
077    /**
078     *  Constructor for {@link NioSocketAcceptor} with default configuration but a
079     *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
080     *  {@link IoService} of the same type.
081     * @param processor the processor to use for managing I/O events
082     */
083    public NioSocketAcceptor(IoProcessor<NioSession> processor) {
084        super(new DefaultSocketSessionConfig(), processor);
085        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
086    }
087
088    /**
089     *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
090     *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
091     *  sharing the same processor and executor over multiple {@link IoService} of the same type.
092     * @param executor the executor for connection
093     * @param processor the processor for I/O operations
094     */
095    public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
096        super(new DefaultSocketSessionConfig(), executor, processor);
097        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
098    }
099
100    /**
101     * Constructor for {@link NioSocketAcceptor} using default parameters, and
102     * given number of {@link NioProcessor} for multithreading I/O operations, and
103     * a custom SelectorProvider for NIO
104     *
105     * @param processorCount the number of processor to create and place in a
106     * @param selectorProvider teh SelectorProvider to use
107     * {@link SimpleIoProcessorPool}
108     */
109    public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
110        super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
111        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
112        this.selectorProvider = selectorProvider;
113    }
114
115    /**
116     * {@inheritDoc}
117     */
118    @Override
119    protected void init() throws Exception {
120        selector = Selector.open();
121    }
122
123    /**
124     * {@inheritDoc}
125     */
126    @Override
127    protected void init(SelectorProvider selectorProvider) throws Exception {
128        this.selectorProvider = selectorProvider;
129
130        if (selectorProvider == null) {
131            selector = Selector.open();
132        } else {
133            selector = selectorProvider.openSelector();
134        }
135    }
136
137    /**
138     * {@inheritDoc}
139     */
140    @Override
141    protected void destroy() throws Exception {
142        if (selector != null) {
143            selector.close();
144        }
145    }
146
147    /**
148     * {@inheritDoc}
149     */
150    public TransportMetadata getTransportMetadata() {
151        return NioSocketSession.METADATA;
152    }
153
154    /**
155     * {@inheritDoc}
156     */
157    @Override
158    public InetSocketAddress getLocalAddress() {
159        return (InetSocketAddress) super.getLocalAddress();
160    }
161
162    /**
163     * {@inheritDoc}
164     */
165    @Override
166    public InetSocketAddress getDefaultLocalAddress() {
167        return (InetSocketAddress) super.getDefaultLocalAddress();
168    }
169
170    /**
171     * {@inheritDoc}
172     */
173    public void setDefaultLocalAddress(InetSocketAddress localAddress) {
174        setDefaultLocalAddress((SocketAddress) localAddress);
175    }
176
177    /**
178     * {@inheritDoc}
179     */
180    @Override
181    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
182
183        SelectionKey key = null;
184
185        if (handle != null) {
186            key = handle.keyFor(selector);
187        }
188
189        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
190            return null;
191        }
192
193        // accept the connection from the client
194        SocketChannel ch = handle.accept();
195
196        if (ch == null) {
197            return null;
198        }
199
200        return new NioSocketSession(this, processor, ch);
201    }
202
203    /**
204     * {@inheritDoc}
205     */
206    @Override
207    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
208        // Creates the listening ServerSocket
209
210        ServerSocketChannel channel = null;
211
212        if (selectorProvider != null) {
213            channel = selectorProvider.openServerSocketChannel();
214        } else {
215            channel = ServerSocketChannel.open();
216        }
217
218        boolean success = false;
219
220        try {
221            // This is a non blocking socket channel
222            channel.configureBlocking(false);
223
224            // Configure the server socket,
225            ServerSocket socket = channel.socket();
226
227            // Set the reuseAddress flag accordingly with the setting
228            socket.setReuseAddress(isReuseAddress());
229
230            // and bind.
231            try {
232                socket.bind(localAddress, getBacklog());
233            } catch (IOException ioe) {
234                // Add some info regarding the address we try to bind to the
235                // message
236                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
237                        + ioe.getMessage();
238                Exception e = new IOException(newMessage);
239                e.initCause(ioe.getCause());
240
241                // And close the channel
242                channel.close();
243
244                throw e;
245            }
246
247            // Register the channel within the selector for ACCEPT event
248            channel.register(selector, SelectionKey.OP_ACCEPT);
249            success = true;
250        } finally {
251            if (!success) {
252                close(channel);
253            }
254        }
255        return channel;
256    }
257
258    /**
259     * {@inheritDoc}
260     */
261    @Override
262    protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
263        return handle.socket().getLocalSocketAddress();
264    }
265
266    /**
267     * Check if we have at least one key whose corresponding channels is
268     * ready for I/O operations.
269     *
270     * This method performs a blocking selection operation.
271     * It returns only after at least one channel is selected,
272     * this selector's wakeup method is invoked, or the current thread
273     * is interrupted, whichever comes first.
274     * 
275     * @return The number of keys having their ready-operation set updated
276     * @throws IOException If an I/O error occurs
277     * @throws ClosedSelectorException If this selector is closed
278     */
279    @Override
280    protected int select() throws Exception {
281        return selector.select();
282    }
283
284    /**
285     * {@inheritDoc}
286     */
287    @Override
288    protected Iterator<ServerSocketChannel> selectedHandles() {
289        return new ServerSocketChannelIterator(selector.selectedKeys());
290    }
291
292    /**
293     * {@inheritDoc}
294     */
295    @Override
296    protected void close(ServerSocketChannel handle) throws Exception {
297        SelectionKey key = handle.keyFor(selector);
298
299        if (key != null) {
300            key.cancel();
301        }
302
303        handle.close();
304    }
305
306    /**
307     * {@inheritDoc}
308     */
309    @Override
310    protected void wakeup() {
311        selector.wakeup();
312    }
313
314    /**
315     * Defines an iterator for the selected-key Set returned by the
316     * selector.selectedKeys(). It replaces the SelectionKey operator.
317     */
318    private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
319        /** The selected-key iterator */
320        private final Iterator<SelectionKey> iterator;
321
322        /**
323         * Build a SocketChannel iterator which will return a SocketChannel instead of
324         * a SelectionKey.
325         * 
326         * @param selectedKeys The selector selected-key set
327         */
328        private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
329            iterator = selectedKeys.iterator();
330        }
331
332        /**
333         * Tells if there are more SockectChannel left in the iterator
334         * @return <tt>true</tt> if there is at least one more
335         * SockectChannel object to read
336         */
337        public boolean hasNext() {
338            return iterator.hasNext();
339        }
340
341        /**
342         * Get the next SocketChannel in the operator we have built from
343         * the selected-key et for this selector.
344         * 
345         * @return The next SocketChannel in the iterator
346         */
347        public ServerSocketChannel next() {
348            SelectionKey key = iterator.next();
349
350            if (key.isValid() && key.isAcceptable()) {
351                return (ServerSocketChannel) key.channel();
352            }
353
354            return null;
355        }
356
357        /**
358         * Remove the current SocketChannel from the iterator
359         */
360        public void remove() {
361            iterator.remove();
362        }
363    }
364}