View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.ServerSocket;
25  import java.net.SocketAddress;
26  import java.nio.channels.ClosedSelectorException;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.nio.channels.ServerSocketChannel;
30  import java.nio.channels.SocketChannel;
31  import java.nio.channels.spi.SelectorProvider;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.concurrent.Executor;
35  
36  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
37  import org.apache.mina.core.service.IoAcceptor;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.SimpleIoProcessorPool;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
43  import org.apache.mina.transport.socket.SocketAcceptor;
44  
45  /**
46   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
47   * handles incoming TCP/IP based socket connections.
48   *
49   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
50   */
51  public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
52  implements SocketAcceptor {
53  
54      private volatile Selector selector;
55      private volatile SelectorProvider selectorProvider = null;
56  
57      /**
58       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
59       */
60      public NioSocketAcceptor() {
61          super(new DefaultSocketSessionConfig(), NioProcessor.class);
62          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
63      }
64  
65      /**
66       * Constructor for {@link NioSocketAcceptor} using default parameters, and
67       * given number of {@link NioProcessor} for multithreading I/O operations.
68       * 
69       * @param processorCount the number of processor to create and place in a
70       * {@link SimpleIoProcessorPool}
71       */
72      public NioSocketAcceptor(int processorCount) {
73          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
74          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
75      }
76  
77      /**
78       *  Constructor for {@link NioSocketAcceptor} with default configuration but a
79       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
80       *  {@link IoService} of the same type.
81       * @param processor the processor to use for managing I/O events
82       */
83      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
84          super(new DefaultSocketSessionConfig(), processor);
85          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86      }
87  
88      /**
89       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
90       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
91       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
92       * @param executor the executor for connection
93       * @param processor the processor for I/O operations
94       */
95      public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
96          super(new DefaultSocketSessionConfig(), executor, processor);
97          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
98      }
99  
100     /**
101      * Constructor for {@link NioSocketAcceptor} using default parameters, and
102      * given number of {@link NioProcessor} for multithreading I/O operations, and
103      * a custom SelectorProvider for NIO
104      *
105      * @param processorCount the number of processor to create and place in a
106      * @param selectorProvider teh SelectorProvider to use
107      * {@link SimpleIoProcessorPool}
108      */
109     public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
110         super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
111         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
112         this.selectorProvider = selectorProvider;
113     }
114 
115     /**
116      * {@inheritDoc}
117      */
118     @Override
119     protected void init() throws Exception {
120         selector = Selector.open();
121     }
122 
123     /**
124      * {@inheritDoc}
125      */
126     @Override
127     protected void init(SelectorProvider selectorProvider) throws Exception {
128         this.selectorProvider = selectorProvider;
129 
130         if (selectorProvider == null) {
131             selector = Selector.open();
132         } else {
133             selector = selectorProvider.openSelector();
134         }
135     }
136 
137     /**
138      * {@inheritDoc}
139      */
140     @Override
141     protected void destroy() throws Exception {
142         if (selector != null) {
143             selector.close();
144         }
145     }
146 
147     /**
148      * {@inheritDoc}
149      */
150     public TransportMetadata getTransportMetadata() {
151         return NioSocketSession.METADATA;
152     }
153 
154     /**
155      * {@inheritDoc}
156      */
157     @Override
158     public InetSocketAddress getLocalAddress() {
159         return (InetSocketAddress) super.getLocalAddress();
160     }
161 
162     /**
163      * {@inheritDoc}
164      */
165     @Override
166     public InetSocketAddress getDefaultLocalAddress() {
167         return (InetSocketAddress) super.getDefaultLocalAddress();
168     }
169 
170     /**
171      * {@inheritDoc}
172      */
173     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
174         setDefaultLocalAddress((SocketAddress) localAddress);
175     }
176 
177     /**
178      * {@inheritDoc}
179      */
180     @Override
181     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
182 
183         SelectionKey key = null;
184 
185         if (handle != null) {
186             key = handle.keyFor(selector);
187         }
188 
189         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
190             return null;
191         }
192 
193         // accept the connection from the client
194         SocketChannel ch = handle.accept();
195 
196         if (ch == null) {
197             return null;
198         }
199 
200         return new NioSocketSession(this, processor, ch);
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     @Override
207     protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
208         // Creates the listening ServerSocket
209 
210         ServerSocketChannel channel = null;
211 
212         if (selectorProvider != null) {
213             channel = selectorProvider.openServerSocketChannel();
214         } else {
215             channel = ServerSocketChannel.open();
216         }
217 
218         boolean success = false;
219 
220         try {
221             // This is a non blocking socket channel
222             channel.configureBlocking(false);
223 
224             // Configure the server socket,
225             ServerSocket socket = channel.socket();
226 
227             // Set the reuseAddress flag accordingly with the setting
228             socket.setReuseAddress(isReuseAddress());
229 
230             // and bind.
231             try {
232                 socket.bind(localAddress, getBacklog());
233             } catch (IOException ioe) {
234                 // Add some info regarding the address we try to bind to the
235                 // message
236                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
237                         + ioe.getMessage();
238                 Exception e = new IOException(newMessage);
239                 e.initCause(ioe.getCause());
240 
241                 // And close the channel
242                 channel.close();
243 
244                 throw e;
245             }
246 
247             // Register the channel within the selector for ACCEPT event
248             channel.register(selector, SelectionKey.OP_ACCEPT);
249             success = true;
250         } finally {
251             if (!success) {
252                 close(channel);
253             }
254         }
255         return channel;
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     @Override
262     protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
263         return handle.socket().getLocalSocketAddress();
264     }
265 
266     /**
267      * Check if we have at least one key whose corresponding channels is
268      * ready for I/O operations.
269      *
270      * This method performs a blocking selection operation.
271      * It returns only after at least one channel is selected,
272      * this selector's wakeup method is invoked, or the current thread
273      * is interrupted, whichever comes first.
274      * 
275      * @return The number of keys having their ready-operation set updated
276      * @throws IOException If an I/O error occurs
277      * @throws ClosedSelectorException If this selector is closed
278      */
279     @Override
280     protected int select() throws Exception {
281         return selector.select();
282     }
283 
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     protected Iterator<ServerSocketChannel> selectedHandles() {
289         return new ServerSocketChannelIterator(selector.selectedKeys());
290     }
291 
292     /**
293      * {@inheritDoc}
294      */
295     @Override
296     protected void close(ServerSocketChannel handle) throws Exception {
297         SelectionKey key = handle.keyFor(selector);
298 
299         if (key != null) {
300             key.cancel();
301         }
302 
303         handle.close();
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     protected void wakeup() {
311         selector.wakeup();
312     }
313 
314     /**
315      * Defines an iterator for the selected-key Set returned by the
316      * selector.selectedKeys(). It replaces the SelectionKey operator.
317      */
318     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
319         /** The selected-key iterator */
320         private final Iterator<SelectionKey> iterator;
321 
322         /**
323          * Build a SocketChannel iterator which will return a SocketChannel instead of
324          * a SelectionKey.
325          * 
326          * @param selectedKeys The selector selected-key set
327          */
328         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
329             iterator = selectedKeys.iterator();
330         }
331 
332         /**
333          * Tells if there are more SockectChannel left in the iterator
334          * @return <tt>true</tt> if there is at least one more
335          * SockectChannel object to read
336          */
337         public boolean hasNext() {
338             return iterator.hasNext();
339         }
340 
341         /**
342          * Get the next SocketChannel in the operator we have built from
343          * the selected-key et for this selector.
344          * 
345          * @return The next SocketChannel in the iterator
346          */
347         public ServerSocketChannel next() {
348             SelectionKey key = iterator.next();
349 
350             if (key.isValid() && key.isAcceptable()) {
351                 return (ServerSocketChannel) key.channel();
352             }
353 
354             return null;
355         }
356 
357         /**
358          * Remove the current SocketChannel from the iterator
359          */
360         public void remove() {
361             iterator.remove();
362         }
363     }
364 }