001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.nio;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.net.SocketAddress;
025import java.nio.channels.SelectionKey;
026import java.nio.channels.Selector;
027import java.nio.channels.SocketChannel;
028import java.util.Collection;
029import java.util.Iterator;
030import java.util.concurrent.Executor;
031
032import org.apache.mina.core.polling.AbstractPollingIoConnector;
033import org.apache.mina.core.service.IoConnector;
034import org.apache.mina.core.service.IoProcessor;
035import org.apache.mina.core.service.IoService;
036import org.apache.mina.core.service.SimpleIoProcessorPool;
037import org.apache.mina.core.service.TransportMetadata;
038import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
039import org.apache.mina.transport.socket.SocketConnector;
040import org.apache.mina.transport.socket.SocketSessionConfig;
041
042/**
043 * {@link IoConnector} for socket transport (TCP/IP).
044 *
045 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
046 */
047public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
048SocketConnector {
049
050    private volatile Selector selector;
051
052    /**
053     * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
054     */
055    public NioSocketConnector() {
056        super(new DefaultSocketSessionConfig(), NioProcessor.class);
057        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
058    }
059
060    /**
061     * Constructor for {@link NioSocketConnector} with default configuration, and
062     * given number of {@link NioProcessor} for multithreading I/O operations
063     * @param processorCount the number of processor to create and place in a
064     * {@link SimpleIoProcessorPool}
065     */
066    public NioSocketConnector(int processorCount) {
067        super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
068        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
069    }
070
071    /**
072     *  Constructor for {@link NioSocketConnector} with default configuration but a
073     *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
074     *  {@link IoService} of the same type.
075     * @param processor the processor to use for managing I/O events
076     */
077    public NioSocketConnector(IoProcessor<NioSession> processor) {
078        super(new DefaultSocketSessionConfig(), processor);
079        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
080    }
081
082    /**
083     *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling
084     *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing
085     *  the same processor and executor over multiple {@link IoService} of the same type.
086     * @param executor the executor for connection
087     * @param processor the processor for I/O operations
088     */
089    public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
090        super(new DefaultSocketSessionConfig(), executor, processor);
091        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
092    }
093
094    /**
095     * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in
096     * thread pool executor to manage the given number of processor instances. The processor class must have
097     * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
098     * no-arg constructor.
099     * 
100     * @param processorClass the processor class.
101     * @param processorCount the number of processors to instantiate.
102     * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
103     * @since 2.0.0-M4
104     */
105    public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
106        super(new DefaultSocketSessionConfig(), processorClass, processorCount);
107    }
108
109    /**
110     * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in
111     * thread pool executor to manage the default number of processor instances. The processor class must have
112     * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
113     * no-arg constructor. The default number of instances is equal to the number of processor cores
114     * in the system, plus one.
115     * 
116     * @param processorClass the processor class.
117     * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
118     * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
119     * @since 2.0.0-M4
120     */
121    public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
122        super(new DefaultSocketSessionConfig(), processorClass);
123    }
124
125    /**
126     * {@inheritDoc}
127     */
128    @Override
129    protected void init() throws Exception {
130        this.selector = Selector.open();
131    }
132
133    /**
134     * {@inheritDoc}
135     */
136    @Override
137    protected void destroy() throws Exception {
138        if (selector != null) {
139            selector.close();
140        }
141    }
142
143    /**
144     * {@inheritDoc}
145     */
146    public TransportMetadata getTransportMetadata() {
147        return NioSocketSession.METADATA;
148    }
149
150    /**
151     * {@inheritDoc}
152     */
153    public SocketSessionConfig getSessionConfig() {
154        return (SocketSessionConfig) sessionConfig;
155    }
156
157    /**
158     * {@inheritDoc}
159     */
160    @Override
161    public InetSocketAddress getDefaultRemoteAddress() {
162        return (InetSocketAddress) super.getDefaultRemoteAddress();
163    }
164
165    /**
166     * {@inheritDoc}
167     */
168    public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
169        super.setDefaultRemoteAddress(defaultRemoteAddress);
170    }
171
172    /**
173     * {@inheritDoc}
174     */
175    @Override
176    protected Iterator<SocketChannel> allHandles() {
177        return new SocketChannelIterator(selector.keys());
178    }
179
180    /**
181     * {@inheritDoc}
182     */
183    @Override
184    protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
185        return handle.connect(remoteAddress);
186    }
187
188    /**
189     * {@inheritDoc}
190     */
191    @Override
192    protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
193        SelectionKey key = handle.keyFor(selector);
194
195        if ((key == null) || (!key.isValid())) {
196            return null;
197        }
198
199        return (ConnectionRequest) key.attachment();
200    }
201
202    /**
203     * {@inheritDoc}
204     */
205    @Override
206    protected void close(SocketChannel handle) throws Exception {
207        SelectionKey key = handle.keyFor(selector);
208
209        if (key != null) {
210            key.cancel();
211        }
212
213        handle.close();
214    }
215
216    /**
217     * {@inheritDoc}
218     */
219    @Override
220    protected boolean finishConnect(SocketChannel handle) throws Exception {
221        if (handle.finishConnect()) {
222            SelectionKey key = handle.keyFor(selector);
223
224            if (key != null) {
225                key.cancel();
226            }
227
228            return true;
229        }
230
231        return false;
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    @Override
238    protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
239        SocketChannel ch = SocketChannel.open();
240
241        int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
242
243        if (receiveBufferSize > 65535) {
244            ch.socket().setReceiveBufferSize(receiveBufferSize);
245        }
246
247        if (localAddress != null) {
248            try {
249                ch.socket().bind(localAddress);
250            } catch (IOException ioe) {
251                // Add some info regarding the address we try to bind to the
252                // message
253                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
254                        + ioe.getMessage();
255                Exception e = new IOException(newMessage);
256                e.initCause(ioe.getCause());
257
258                // Preemptively close the channel
259                ch.close();
260                throw e;
261            }
262        }
263
264        ch.configureBlocking(false);
265
266        return ch;
267    }
268
269    /**
270     * {@inheritDoc}
271     */
272    @Override
273    protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
274        return new NioSocketSession(this, processor, handle);
275    }
276
277    /**
278     * {@inheritDoc}
279     */
280    @Override
281    protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
282        handle.register(selector, SelectionKey.OP_CONNECT, request);
283    }
284
285    /**
286     * {@inheritDoc}
287     */
288    @Override
289    protected int select(int timeout) throws Exception {
290        return selector.select(timeout);
291    }
292
293    /**
294     * {@inheritDoc}
295     */
296    @Override
297    protected Iterator<SocketChannel> selectedHandles() {
298        return new SocketChannelIterator(selector.selectedKeys());
299    }
300
301    /**
302     * {@inheritDoc}
303     */
304    @Override
305    protected void wakeup() {
306        selector.wakeup();
307    }
308
309    private static class SocketChannelIterator implements Iterator<SocketChannel> {
310
311        private final Iterator<SelectionKey> i;
312
313        private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
314            this.i = selectedKeys.iterator();
315        }
316
317        /**
318         * {@inheritDoc}
319         */
320        public boolean hasNext() {
321            return i.hasNext();
322        }
323
324        /**
325         * {@inheritDoc}
326         */
327        public SocketChannel next() {
328            SelectionKey key = i.next();
329            return (SocketChannel) key.channel();
330        }
331
332        /**
333         * {@inheritDoc}
334         */
335        public void remove() {
336            i.remove();
337        }
338    }
339}