View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.Collection;
28  import java.util.Iterator;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.polling.AbstractPollingIoConnector;
32  import org.apache.mina.core.service.IoConnector;
33  import org.apache.mina.core.service.IoProcessor;
34  import org.apache.mina.core.service.SimpleIoProcessorPool;
35  import org.apache.mina.core.service.TransportMetadata;
36  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37  import org.apache.mina.transport.socket.SocketConnector;
38  import org.apache.mina.transport.socket.SocketSessionConfig;
39  
40  /**
41   * {@link IoConnector} for socket transport (TCP/IP).
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
45   */
46  public final class NioSocketConnector
47          extends AbstractPollingIoConnector<NioSession, SocketChannel>
48          implements SocketConnector {
49  
50      private volatile Selector selector;
51  
52      /**
53       * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
54       */
55      public NioSocketConnector() {
56          super(new DefaultSocketSessionConfig(), NioProcessor.class);
57          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58      }
59  
60      /**
61       * Constructor for {@link NioSocketConnector} with default configuration, and 
62       * given number of {@link NioProcessor} for multithreading I/O operations
63       * @param processorCount the number of processor to create and place in a
64       * {@link SimpleIoProcessorPool} 
65       */
66      public NioSocketConnector(int processorCount) {
67          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69      }
70  
71      /**
72       *  Constructor for {@link NioSocketConnector} with default configuration but a
73       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
74       *  {@link IoService} of the same type.
75       * @param processor the processor to use for managing I/O events
76       */
77      public NioSocketConnector(IoProcessor<NioSession> processor) {
78          super(new DefaultSocketSessionConfig(), processor);
79          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80      }
81  
82      /**
83       *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling 
84       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
85       *  the same processor and executor over multiple {@link IoService} of the same type.
86       * @param executor the executor for connection
87       * @param processor the processor for I/O operations
88       */
89      public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90          super(new DefaultSocketSessionConfig(), executor, processor);
91          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92      }
93  
94      /**
95       * {@inheritDoc}
96       */
97      @Override
98      protected void init() throws Exception {
99          this.selector = Selector.open();
100     }
101 
102     /**
103      * {@inheritDoc}
104      */
105     @Override
106     protected void destroy() throws Exception {
107         if (selector != null) {
108             selector.close();
109         }
110     }
111 
112     /**
113      * {@inheritDoc}
114      */
115     public TransportMetadata getTransportMetadata() {
116         return NioSocketSession.METADATA;
117     }
118 
119     /**
120      * {@inheritDoc}
121      */
122     @Override
123     public SocketSessionConfig getSessionConfig() {
124         return (SocketSessionConfig) super.getSessionConfig();
125     }
126     
127     /**
128      * {@inheritDoc}
129      */
130     @Override
131     public InetSocketAddress getDefaultRemoteAddress() {
132         return (InetSocketAddress) super.getDefaultRemoteAddress();
133     }
134     
135     /**
136      * {@inheritDoc}
137      */
138     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
139         super.setDefaultRemoteAddress(defaultRemoteAddress);
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     protected Iterator<SocketChannel> allHandles() {
147         return new SocketChannelIterator(selector.keys());
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
155             throws Exception {
156         return handle.connect(remoteAddress);
157     }
158 
159     /**
160      * {@inheritDoc}
161      */
162     @Override
163     protected ConnectionRequest connectionRequest(SocketChannel handle) {
164         SelectionKey key = handle.keyFor(selector);
165         if (key == null) {
166             return null;
167         }
168 
169         return (ConnectionRequest) key.attachment();
170     }
171 
172     /**
173      * {@inheritDoc}
174      */
175     @Override
176     protected void close(SocketChannel handle) throws Exception {
177         SelectionKey key = handle.keyFor(selector);
178         if (key != null) {
179             key.cancel();
180         }
181         handle.close();
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     @Override
188     protected boolean finishConnect(SocketChannel handle) throws Exception {
189         SelectionKey key = handle.keyFor(selector);
190         if (handle.finishConnect()) {
191             if (key != null) {
192                 key.cancel();
193             }
194             return true;
195         }
196 
197         return false;
198     }
199 
200     /**
201      * {@inheritDoc}
202      */
203     @Override
204     protected SocketChannel newHandle(SocketAddress localAddress)
205             throws Exception {
206         SocketChannel ch = SocketChannel.open();
207 
208         int receiveBufferSize =
209             (getSessionConfig()).getReceiveBufferSize();
210         if (receiveBufferSize > 65535) {
211             ch.socket().setReceiveBufferSize(receiveBufferSize);
212         }
213 
214         if (localAddress != null) {
215             ch.socket().bind(localAddress);
216         }
217         ch.configureBlocking(false);
218         return ch;
219     }
220 
221     /**
222      * {@inheritDoc}
223      */
224     @Override
225     protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
226         return new NioSocketSession(this, processor, handle);
227     }
228 
229     /**
230      * {@inheritDoc}
231      */
232     @Override
233     protected void register(SocketChannel handle, ConnectionRequest request)
234             throws Exception {
235         handle.register(selector, SelectionKey.OP_CONNECT, request);
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     @Override
242     protected boolean select(int timeout) throws Exception {
243         return selector.select(timeout) > 0;
244     }
245 
246     /**
247      * {@inheritDoc}
248      */
249     @Override
250     protected Iterator<SocketChannel> selectedHandles() {
251         return new SocketChannelIterator(selector.selectedKeys());
252     }
253 
254     /**
255      * {@inheritDoc}
256      */
257     @Override
258     protected void wakeup() {
259         selector.wakeup();
260     }
261 
262     private static class SocketChannelIterator implements Iterator<SocketChannel> {
263 
264         private final Iterator<SelectionKey> i;
265 
266         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
267             this.i = selectedKeys.iterator();
268         }
269 
270         /**
271          * {@inheritDoc}
272          */
273         public boolean hasNext() {
274             return i.hasNext();
275         }
276 
277         /**
278          * {@inheritDoc}
279          */
280         public SocketChannel next() {
281             SelectionKey key = i.next();
282             return (SocketChannel) key.channel();
283         }
284 
285         /**
286          * {@inheritDoc}
287          */
288         public void remove() {
289             i.remove();
290         }
291     }
292 }