View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.Collection;
28  import java.util.Iterator;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.polling.AbstractPollingIoConnector;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoProcessor;
34  import org.apache.mina.core.service.SimpleIoProcessorPool;
35  import org.apache.mina.core.service.TransportMetadata;
36  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37  import org.apache.mina.transport.socket.SocketConnector;
38  import org.apache.mina.transport.socket.SocketSessionConfig;
39  
40  /**
41   * {@link IoConnector} for socket transport (TCP/IP).
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   */
45  public final class NioSocketConnector
46          extends AbstractPollingIoConnector<NioSession, SocketChannel>
47          implements SocketConnector {
48  
49      private volatile Selector selector;
50  
51      /**
52       * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
53       */
54      public NioSocketConnector() {
55          super(new DefaultSocketSessionConfig(), NioProcessor.class);
56          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
57      }
58  
59      /**
60       * Constructor for {@link NioSocketConnector} with default configuration, and 
61       * given number of {@link NioProcessor} for multithreading I/O operations
62       * @param processorCount the number of processor to create and place in a
63       * {@link SimpleIoProcessorPool} 
64       */
65      public NioSocketConnector(int processorCount) {
66          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
67          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68      }
69  
70      /**
71       *  Constructor for {@link NioSocketConnector} with default configuration but a
72       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
73       *  {@link IoService} of the same type.
74       * @param processor the processor to use for managing I/O events
75       */
76      public NioSocketConnector(IoProcessor<NioSession> processor) {
77          super(new DefaultSocketSessionConfig(), processor);
78          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79      }
80  
81      /**
82       *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling 
83       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
84       *  the same processor and executor over multiple {@link IoService} of the same type.
85       * @param executor the executor for connection
86       * @param processor the processor for I/O operations
87       */
88      public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
89          super(new DefaultSocketSessionConfig(), executor, processor);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92      
93      /**
94       * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in 
95       * thread pool executor to manage the given number of processor instances. The processor class must have 
96       * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
97       * no-arg constructor.
98       * 
99       * @param processorClass the processor class.
100      * @param processorCount the number of processors to instantiate.
101      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
102      * @since 2.0.0-M4
103      */
104     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass,
105             int processorCount) {
106         super(new DefaultSocketSessionConfig(), processorClass, processorCount);
107     }
108 
109     /**
110      * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in 
111      * thread pool executor to manage the default number of processor instances. The processor class must have 
112      * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 
113      * no-arg constructor. The default number of instances is equal to the number of processor cores 
114      * in the system, plus one.
115      * 
116      * @param processorClass the processor class.
117      * @see org.apache.mina.core.service.SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int)
118      * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE
119      * @since 2.0.0-M4
120      */
121     public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
122         super(new DefaultSocketSessionConfig(), processorClass);
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     @Override
129     protected void init() throws Exception {
130         this.selector = Selector.open();
131     }
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     protected void destroy() throws Exception {
138         if (selector != null) {
139             selector.close();
140         }
141     }
142 
143     /**
144      * {@inheritDoc}
145      */
146     public TransportMetadata getTransportMetadata() {
147         return NioSocketSession.METADATA;
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     public SocketSessionConfig getSessionConfig() {
155         return (SocketSessionConfig) super.getSessionConfig();
156     }
157     
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public InetSocketAddress getDefaultRemoteAddress() {
163         return (InetSocketAddress) super.getDefaultRemoteAddress();
164     }
165     
166     /**
167      * {@inheritDoc}
168      */
169     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
170         super.setDefaultRemoteAddress(defaultRemoteAddress);
171     }
172 
173     /**
174      * {@inheritDoc}
175      */
176     @Override
177     protected Iterator<SocketChannel> allHandles() {
178         return new SocketChannelIterator(selector.keys());
179     }
180 
181     /**
182      * {@inheritDoc}
183      */
184     @Override
185     protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
186             throws Exception {
187         return handle.connect(remoteAddress);
188     }
189 
190     /**
191      * {@inheritDoc}
192      */
193     @Override
194     protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
195         SelectionKey key = handle.keyFor(selector);
196         
197         if ((key == null) || (!key.isValid())) { 
198             return null;
199         }
200 
201         return (ConnectionRequest) key.attachment();
202     }
203 
204     /**
205      * {@inheritDoc}
206      */
207     @Override
208     protected void close(SocketChannel handle) throws Exception {
209         SelectionKey key = handle.keyFor(selector);
210         
211         if (key != null) {
212             key.cancel();
213         }
214         
215         handle.close();
216     }
217 
218     /**
219      * {@inheritDoc}
220      */
221     @Override
222     protected boolean finishConnect(SocketChannel handle) throws Exception {
223         if (handle.finishConnect()) {
224             SelectionKey key = handle.keyFor(selector);
225 
226             if (key != null) {
227                 key.cancel();
228             }
229             
230             return true;
231         }
232 
233         return false;
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     protected SocketChannel newHandle(SocketAddress localAddress)
241             throws Exception {
242         SocketChannel ch = SocketChannel.open();
243 
244         int receiveBufferSize =
245             (getSessionConfig()).getReceiveBufferSize();
246         if (receiveBufferSize > 65535) {
247             ch.socket().setReceiveBufferSize(receiveBufferSize);
248         }
249 
250         if (localAddress != null) {
251             ch.socket().bind(localAddress);
252         }
253         ch.configureBlocking(false);
254         return ch;
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     @Override
261     protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
262         return new NioSocketSession(this, processor, handle);
263     }
264 
265     /**
266      * {@inheritDoc}
267      */
268     @Override
269     protected void register(SocketChannel handle, ConnectionRequest request)
270             throws Exception {
271         handle.register(selector, SelectionKey.OP_CONNECT, request);
272     }
273 
274     /**
275      * {@inheritDoc}
276      */
277     @Override
278     protected int select(int timeout) throws Exception {
279         return selector.select(timeout);
280     }
281 
282     /**
283      * {@inheritDoc}
284      */
285     @Override
286     protected Iterator<SocketChannel> selectedHandles() {
287         return new SocketChannelIterator(selector.selectedKeys());
288     }
289 
290     /**
291      * {@inheritDoc}
292      */
293     @Override
294     protected void wakeup() {
295         selector.wakeup();
296     }
297 
298     private static class SocketChannelIterator implements Iterator<SocketChannel> {
299 
300         private final Iterator<SelectionKey> i;
301 
302         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
303             this.i = selectedKeys.iterator();
304         }
305 
306         /**
307          * {@inheritDoc}
308          */
309         public boolean hasNext() {
310             return i.hasNext();
311         }
312 
313         /**
314          * {@inheritDoc}
315          */
316         public SocketChannel next() {
317             SelectionKey key = i.next();
318             return (SocketChannel) key.channel();
319         }
320 
321         /**
322          * {@inheritDoc}
323          */
324         public void remove() {
325             i.remove();
326         }
327     }
328 }