View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.SocketChannel;
28  import java.util.Collection;
29  import java.util.Iterator;
30  import java.util.concurrent.Executor;
31  
32  import org.apache.mina.core.polling.AbstractPollingIoConnector;
33  import org.apache.mina.core.service.IoConnector;
34  import org.apache.mina.core.service.IoProcessor;
35  import org.apache.mina.core.service.IoService;
36  import org.apache.mina.core.service.SimpleIoProcessorPool;
37  import org.apache.mina.core.service.TransportMetadata;
38  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39  import org.apache.mina.transport.socket.SocketConnector;
40  import org.apache.mina.transport.socket.SocketSessionConfig;
41  
42  /**
43   * {@link IoConnector} for socket transport (TCP/IP).
44   *
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   */
47  public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
48  SocketConnector {
49  
50      private volatile Selector selector;
51  
52      /**
53       * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
54       */
55      public NioSocketConnector() {
56          super(new DefaultSocketSessionConfig(), NioProcessor.class);
57          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58      }
59  
60      /**
61       * Constructor for {@link NioSocketConnector} with default configuration, and
62       * given number of {@link NioProcessor} for multithreading I/O operations
63       * @param processorCount the number of processor to create and place in a
64       * {@link SimpleIoProcessorPool}
65       */
66      public NioSocketConnector(int processorCount) {
67          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69      }
70  
71      /**
72       *  Constructor for {@link NioSocketConnector} with default configuration but a
73       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
74       *  {@link IoService} of the same type.
75       * @param processor the processor to use for managing I/O events
76       */
77      public NioSocketConnector(IoProcessor<NioSession> processor) {
78          super(new DefaultSocketSessionConfig(), processor);
79          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80      }
81  
82      /**
83       *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling
84       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing
85       *  the same processor and executor over multiple {@link IoService} of the same type.
86       * @param executor the executor for connection
87       * @param processor the processor for I/O operations
88       */
89      public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90          super(new DefaultSocketSessionConfig(), executor, processor);
91          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92      }
93  
94      /**
95       * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in
96       * thread pool executor to manage the given number of processor instances. The processor class must have
97       * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
98       * no-arg constructor.
99       * 
100      * @param processorClass the processor class.
101      * @param processorCount the number of processors to instantiate.
102      * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
103      * @since 2.0.0-M4
104      */
105     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
106         super(new DefaultSocketSessionConfig(), processorClass, processorCount);
107     }
108 
109     /**
110      * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in
111      * thread pool executor to manage the default number of processor instances. The processor class must have
112      * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
113      * no-arg constructor. The default number of instances is equal to the number of processor cores
114      * in the system, plus one.
115      * 
116      * @param processorClass the processor class.
117      * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
118      * @since 2.0.0-M4
119      */
120     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
121         super(new DefaultSocketSessionConfig(), processorClass);
122     }
123 
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     protected void init() throws Exception {
129         this.selector = Selector.open();
130     }
131 
132     /**
133      * {@inheritDoc}
134      */
135     @Override
136     protected void destroy() throws Exception {
137         if (selector != null) {
138             selector.close();
139         }
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     public TransportMetadata getTransportMetadata() {
147         return NioSocketSession.METADATA;
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     public SocketSessionConfig getSessionConfig() {
155         return (SocketSessionConfig) sessionConfig;
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public InetSocketAddress getDefaultRemoteAddress() {
163         return (InetSocketAddress) super.getDefaultRemoteAddress();
164     }
165 
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
171         super.setDefaultRemoteAddress(defaultRemoteAddress);
172     }
173 
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     protected Iterator<SocketChannel> allHandles() {
179         return new SocketChannelIterator(selector.keys());
180     }
181 
182     /**
183      * {@inheritDoc}
184      */
185     @Override
186     protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
187         return handle.connect(remoteAddress);
188     }
189 
190     /**
191      * {@inheritDoc}
192      */
193     @Override
194     protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
195         SelectionKey key = handle.keyFor(selector);
196 
197         if ((key == null) || (!key.isValid())) {
198             return null;
199         }
200 
201         return (ConnectionRequest) key.attachment();
202     }
203 
204     /**
205      * {@inheritDoc}
206      */
207     @Override
208     protected void close(SocketChannel handle) throws Exception {
209         SelectionKey key = handle.keyFor(selector);
210 
211         if (key != null) {
212             key.cancel();
213         }
214 
215         handle.close();
216     }
217 
218     /**
219      * {@inheritDoc}
220      */
221     @Override
222     protected boolean finishConnect(SocketChannel handle) throws Exception {
223         if (handle.finishConnect()) {
224             SelectionKey key = handle.keyFor(selector);
225 
226             if (key != null) {
227                 key.cancel();
228             }
229 
230             return true;
231         }
232 
233         return false;
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
241         SocketChannel ch = SocketChannel.open();
242 
243         int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
244 
245         if (receiveBufferSize > 65535) {
246             ch.socket().setReceiveBufferSize(receiveBufferSize);
247         }
248 
249         if (localAddress != null) {
250             try {
251                 ch.socket().bind(localAddress);
252             } catch (IOException ioe) {
253                 // Add some info regarding the address we try to bind to the
254                 // message
255                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
256                         + ioe.getMessage();
257                 Exception e = new IOException(newMessage);
258                 e.initCause(ioe.getCause());
259 
260                 // Preemptively close the channel
261                 ch.close();
262                 throw e;
263             }
264         }
265 
266         ch.configureBlocking(false);
267 
268         return ch;
269     }
270 
271     /**
272      * {@inheritDoc}
273      */
274     @Override
275     protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
276         return new NioSocketSession(this, processor, handle);
277     }
278 
279     /**
280      * {@inheritDoc}
281      */
282     @Override
283     protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
284         handle.register(selector, SelectionKey.OP_CONNECT, request);
285     }
286 
287     /**
288      * {@inheritDoc}
289      */
290     @Override
291     protected int select(int timeout) throws Exception {
292         return selector.select(timeout);
293     }
294 
295     /**
296      * {@inheritDoc}
297      */
298     @Override
299     protected Iterator<SocketChannel> selectedHandles() {
300         return new SocketChannelIterator(selector.selectedKeys());
301     }
302 
303     /**
304      * {@inheritDoc}
305      */
306     @Override
307     protected void wakeup() {
308         selector.wakeup();
309     }
310 
311     private static class SocketChannelIterator implements Iterator<SocketChannel> {
312 
313         private final Iterator<SelectionKey> i;
314 
315         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
316             this.i = selectedKeys.iterator();
317         }
318 
319         /**
320          * {@inheritDoc}
321          */
322         @Override
323         public boolean hasNext() {
324             return i.hasNext();
325         }
326 
327         /**
328          * {@inheritDoc}
329          */
330         @Override
331         public SocketChannel next() {
332             SelectionKey key = i.next();
333             return (SocketChannel) key.channel();
334         }
335 
336         /**
337          * {@inheritDoc}
338          */
339         @Override
340         public void remove() {
341             i.remove();
342         }
343     }
344 }