View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.Collection;
28  import java.util.Iterator;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.polling.AbstractPollingIoConnector;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoProcessor;
34  import org.apache.mina.core.service.IoService;
35  import org.apache.mina.core.service.SimpleIoProcessorPool;
36  import org.apache.mina.core.service.TransportMetadata;
37  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38  import org.apache.mina.transport.socket.SocketConnector;
39  import org.apache.mina.transport.socket.SocketSessionConfig;
40  
41  /**
42   * {@link IoConnector} for socket transport (TCP/IP).
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public final class NioSocketConnector
47          extends AbstractPollingIoConnector<NioSession, SocketChannel>
48          implements SocketConnector {
49  
50      private volatile Selector selector;
51  
52      /**
53       * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
54       */
55      public NioSocketConnector() {
56          super(new DefaultSocketSessionConfig(), NioProcessor.class);
57          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58      }
59  
60      /**
61       * Constructor for {@link NioSocketConnector} with default configuration, and 
62       * given number of {@link NioProcessor} for multithreading I/O operations
63       * @param processorCount the number of processor to create and place in a
64       * {@link SimpleIoProcessorPool} 
65       */
66      public NioSocketConnector(int processorCount) {
67          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69      }
70  
71      /**
72       *  Constructor for {@link NioSocketConnector} with default configuration but a
73       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
74       *  {@link IoService} of the same type.
75       * @param processor the processor to use for managing I/O events
76       */
77      public NioSocketConnector(IoProcessor<NioSession> processor) {
78          super(new DefaultSocketSessionConfig(), processor);
79          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80      }
81  
82      /**
83       *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling 
84       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
85       *  the same processor and executor over multiple {@link IoService} of the same type.
86       * @param executor the executor for connection
87       * @param processor the processor for I/O operations
88       */
89      public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90          super(new DefaultSocketSessionConfig(), executor, processor);
91          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92      }
93      
94      /**
95       * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in 
96       * thread pool executor to manage the given number of processor instances. The processor class must have 
97       * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
98       * no-arg constructor.
99       * 
100      * @param processorClass the processor class.
101      * @param processorCount the number of processors to instantiate.
102      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
103      * @since 2.0.0-M4
104      */
105     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass,
106             int processorCount) {
107         super(new DefaultSocketSessionConfig(), processorClass, processorCount);
108     }
109 
110     /**
111      * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in 
112      * thread pool executor to manage the default number of processor instances. The processor class must have 
113      * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
114      * no-arg constructor. The default number of instances is equal to the number of processor cores 
115      * in the system, plus one.
116      * 
117      * @param processorClass the processor class.
118      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
119      * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
120      * @since 2.0.0-M4
121      */
122     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
123         super(new DefaultSocketSessionConfig(), processorClass);
124     }
125 
126     /**
127      * {@inheritDoc}
128      */
129     @Override
130     protected void init() throws Exception {
131         this.selector = Selector.open();
132     }
133 
134     /**
135      * {@inheritDoc}
136      */
137     @Override
138     protected void destroy() throws Exception {
139         if (selector != null) {
140             selector.close();
141         }
142     }
143 
144     /**
145      * {@inheritDoc}
146      */
147     public TransportMetadata getTransportMetadata() {
148         return NioSocketSession.METADATA;
149     }
150 
151     /**
152      * {@inheritDoc}
153      */
154     @Override
155     public SocketSessionConfig getSessionConfig() {
156         return (SocketSessionConfig) super.getSessionConfig();
157     }
158     
159     /**
160      * {@inheritDoc}
161      */
162     @Override
163     public InetSocketAddress getDefaultRemoteAddress() {
164         return (InetSocketAddress) super.getDefaultRemoteAddress();
165     }
166     
167     /**
168      * {@inheritDoc}
169      */
170     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
171         super.setDefaultRemoteAddress(defaultRemoteAddress);
172     }
173 
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     protected Iterator<SocketChannel> allHandles() {
179         return new SocketChannelIterator(selector.keys());
180     }
181 
182     /**
183      * {@inheritDoc}
184      */
185     @Override
186     protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
187             throws Exception {
188         return handle.connect(remoteAddress);
189     }
190 
191     /**
192      * {@inheritDoc}
193      */
194     @Override
195     protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
196         SelectionKey key = handle.keyFor(selector);
197         
198         if ((key == null) || (!key.isValid())) { 
199             return null;
200         }
201 
202         return (ConnectionRequest) key.attachment();
203     }
204 
205     /**
206      * {@inheritDoc}
207      */
208     @Override
209     protected void close(SocketChannel handle) throws Exception {
210         SelectionKey key = handle.keyFor(selector);
211         
212         if (key != null) {
213             key.cancel();
214         }
215         
216         handle.close();
217     }
218 
219     /**
220      * {@inheritDoc}
221      */
222     @Override
223     protected boolean finishConnect(SocketChannel handle) throws Exception {
224         if (handle.finishConnect()) {
225             SelectionKey key = handle.keyFor(selector);
226 
227             if (key != null) {
228                 key.cancel();
229             }
230             
231             return true;
232         }
233 
234         return false;
235     }
236 
237     /**
238      * {@inheritDoc}
239      */
240     @Override
241     protected SocketChannel newHandle(SocketAddress localAddress)
242             throws Exception {
243         SocketChannel ch = SocketChannel.open();
244 
245         int receiveBufferSize =
246             (getSessionConfig()).getReceiveBufferSize();
247         if (receiveBufferSize > 65535) {
248             ch.socket().setReceiveBufferSize(receiveBufferSize);
249         }
250 
251         if (localAddress != null) {
252             ch.socket().bind(localAddress);
253         }
254         ch.configureBlocking(false);
255         return ch;
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     @Override
262     protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
263         return new NioSocketSession(this, processor, handle);
264     }
265 
266     /**
267      * {@inheritDoc}
268      */
269     @Override
270     protected void register(SocketChannel handle, ConnectionRequest request)
271             throws Exception {
272         handle.register(selector, SelectionKey.OP_CONNECT, request);
273     }
274 
275     /**
276      * {@inheritDoc}
277      */
278     @Override
279     protected int select(int timeout) throws Exception {
280         return selector.select(timeout);
281     }
282 
283     /**
284      * {@inheritDoc}
285      */
286     @Override
287     protected Iterator<SocketChannel> selectedHandles() {
288         return new SocketChannelIterator(selector.selectedKeys());
289     }
290 
291     /**
292      * {@inheritDoc}
293      */
294     @Override
295     protected void wakeup() {
296         selector.wakeup();
297     }
298 
299     private static class SocketChannelIterator implements Iterator<SocketChannel> {
300 
301         private final Iterator<SelectionKey> i;
302 
303         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
304             this.i = selectedKeys.iterator();
305         }
306 
307         /**
308          * {@inheritDoc}
309          */
310         public boolean hasNext() {
311             return i.hasNext();
312         }
313 
314         /**
315          * {@inheritDoc}
316          */
317         public SocketChannel next() {
318             SelectionKey key = i.next();
319             return (SocketChannel) key.channel();
320         }
321 
322         /**
323          * {@inheritDoc}
324          */
325         public void remove() {
326             i.remove();
327         }
328     }
329 }