View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.ServerSocket;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.ServerSocketChannel;
28  import java.nio.channels.SocketChannel;
29  import java.util.Collection;
30  import java.util.Iterator;
31  import java.util.concurrent.Executor;
32  
33  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
34  import org.apache.mina.core.service.IoAcceptor;
35  import org.apache.mina.core.service.IoProcessor;
36  import org.apache.mina.core.service.SimpleIoProcessorPool;
37  import org.apache.mina.core.service.TransportMetadata;
38  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39  import org.apache.mina.transport.socket.SocketAcceptor;
40  import org.apache.mina.transport.socket.SocketSessionConfig;
41  
42  /**
43   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
44   * handles incoming TCP/IP based socket connections.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   */
48  public final class NioSocketAcceptor
49          extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
50          implements SocketAcceptor {
51  
52      /** 
53       * Define the number of socket that can wait to be accepted. Default
54       * to 50 (as in the SocketServer default).
55       */
56      private int backlog = 50;
57  
58      private boolean reuseAddress = false;
59  
60      private volatile Selector selector;
61  
62      /**
63       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
64       */
65      public NioSocketAcceptor() {
66          super(new DefaultSocketSessionConfig(), NioProcessor.class);
67          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68      }
69  
70      /**
71       * Constructor for {@link NioSocketAcceptor} using default parameters, and 
72       * given number of {@link NioProcessor} for multithreading I/O operations.
73       * 
74       * @param processorCount the number of processor to create and place in a
75       * {@link SimpleIoProcessorPool} 
76       */
77      public NioSocketAcceptor(int processorCount) {
78          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
79          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80      }
81  
82      /**
83      *  Constructor for {@link NioSocketAcceptor} with default configuration but a
84       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
85       *  {@link IoService} of the same type.
86       * @param processor the processor to use for managing I/O events
87       */
88      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
89          super(new DefaultSocketSessionConfig(), processor);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92  
93      /**
94       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling 
95       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for 
96       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
97       * @param executor the executor for connection
98       * @param processor the processor for I/O operations
99       */
100     public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
101         super(new DefaultSocketSessionConfig(), executor, processor);
102         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
103     }
104 
105     /**
106      * {@inheritDoc}
107      */
108     @Override
109     protected void init() throws Exception {
110         selector = Selector.open();
111     }
112     
113     /**
114      * {@inheritDoc}
115      */
116     @Override
117     protected void destroy() throws Exception {
118         if (selector != null) {
119             selector.close();
120         }
121     }
122 
123     /**
124      * {@inheritDoc}
125      */
126     public TransportMetadata getTransportMetadata() {
127         return NioSocketSession.METADATA;
128     }
129 
130     /**
131      * {@inheritDoc}
132      */
133     @Override
134     public SocketSessionConfig getSessionConfig() {
135         return (SocketSessionConfig) super.getSessionConfig();
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     @Override
142     public InetSocketAddress getLocalAddress() {
143         return (InetSocketAddress) super.getLocalAddress();
144     }
145 
146     /**
147      * {@inheritDoc}
148      */
149     @Override
150     public InetSocketAddress getDefaultLocalAddress() {
151         return (InetSocketAddress) super.getDefaultLocalAddress();
152     }
153 
154     /**
155      * {@inheritDoc}
156      */
157     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
158         setDefaultLocalAddress((SocketAddress) localAddress);
159     }
160 
161     /**
162      * {@inheritDoc}
163      */
164     public boolean isReuseAddress() {
165         return reuseAddress;
166     }
167 
168     /**
169      * {@inheritDoc}
170      */
171     public void setReuseAddress(boolean reuseAddress) {
172         synchronized (bindLock) {
173             if (isActive()) {
174                 throw new IllegalStateException(
175                         "reuseAddress can't be set while the acceptor is bound.");
176             }
177 
178             this.reuseAddress = reuseAddress;
179         }
180     }
181 
182     /**
183      * {@inheritDoc}
184      */
185     public int getBacklog() {
186         return backlog;
187     }
188 
189     /**
190      * {@inheritDoc}
191      */
192     public void setBacklog(int backlog) {
193         synchronized (bindLock) {
194             if (isActive()) {
195                 throw new IllegalStateException(
196                         "backlog can't be set while the acceptor is bound.");
197             }
198 
199             this.backlog = backlog;
200         }
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     @Override
207     protected NioSession accept(IoProcessor<NioSession> processor,
208             ServerSocketChannel handle) throws Exception {
209 
210         SelectionKey key = handle.keyFor(selector);
211         
212         if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {
213             return null;
214         }
215 
216         // accept the connection from the client
217         SocketChannel ch = handle.accept();
218         
219         if (ch == null) {
220             return null;
221         }
222 
223         return new NioSocketSession(this, processor, ch);
224     }
225 
226     /**
227      * {@inheritDoc}
228      */
229     @Override
230     protected ServerSocketChannel open(SocketAddress localAddress)
231             throws Exception {
232         // Creates the listening ServerSocket
233         ServerSocketChannel channel = ServerSocketChannel.open();
234         
235         boolean success = false;
236         
237         try {
238             // This is a non blocking socket channel
239             channel.configureBlocking(false);
240         
241             // Configure the server socket,
242             ServerSocket socket = channel.socket();
243             
244             // Set the reuseAddress flag accordingly with the setting
245             socket.setReuseAddress(isReuseAddress());
246             
247             // and bind.
248             socket.bind(localAddress, getBacklog());
249             
250             // Register the channel within the selector for ACCEPT event
251             channel.register(selector, SelectionKey.OP_ACCEPT);
252             success = true;
253         } finally {
254             if (!success) {
255                 close(channel);
256             }
257         }
258         return channel;
259     }
260 
261     /**
262      * {@inheritDoc}
263      */
264     @Override
265     protected SocketAddress localAddress(ServerSocketChannel handle)
266             throws Exception {
267         return handle.socket().getLocalSocketAddress();
268     }
269 
270     /**
271       * Check if we have at least one key whose corresponding channels is 
272       * ready for I/O operations.
273       *
274       * This method performs a blocking selection operation. 
275       * It returns only after at least one channel is selected, 
276       * this selector's wakeup method is invoked, or the current thread 
277       * is interrupted, whichever comes first.
278       * 
279       * @return The number of keys having their ready-operation set updated
280       * @throws IOException If an I/O error occurs
281       * @throws ClosedSelectorException If this selector is closed 
282       */
283     @Override
284     protected int select() throws Exception {
285         return selector.select();
286     }
287 
288     /**
289      * {@inheritDoc}
290      */
291     @Override
292     protected Iterator<ServerSocketChannel> selectedHandles() {
293         return new ServerSocketChannelIterator(selector.selectedKeys());
294     }
295 
296     /**
297      * {@inheritDoc}
298      */
299     @Override
300     protected void close(ServerSocketChannel handle) throws Exception {
301         SelectionKey key = handle.keyFor(selector);
302         
303         if (key != null) {
304             key.cancel();
305         }
306         
307         handle.close();
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     protected void wakeup() {
315         selector.wakeup();
316     }
317 
318     /**
319      * Defines an iterator for the selected-key Set returned by the 
320      * selector.selectedKeys(). It replaces the SelectionKey operator.
321      */
322     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
323         /** The selected-key iterator */
324         private final Iterator<SelectionKey> iterator;
325 
326         /**
327          * Build a SocketChannel iterator which will return a SocketChannel instead of
328          * a SelectionKey.
329          * 
330          * @param selectedKeys The selector selected-key set 
331          */
332         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
333             iterator = selectedKeys.iterator();
334         }
335 
336         /**
337          * Tells if there are more SockectChannel left in the iterator
338          * @return <code>true</code> if there is at least one more 
339          * SockectChannel object to read
340          */
341         public boolean hasNext() {
342             return iterator.hasNext();
343         }
344 
345         /**
346          * Get the next SocketChannel in the operator we have built from
347          * the selected-key et for this selector.
348          * 
349          * @return The next SocketChannel in the iterator
350          */
351         public ServerSocketChannel next() {
352             SelectionKey key = iterator.next();
353             
354             if ( key.isValid() && key.isAcceptable() ) {
355                 return (ServerSocketChannel) key.channel();
356             }
357 
358             return null;
359         }
360 
361         /**
362          * Remove the current SocketChannel from the iterator 
363          */
364         public void remove() {
365             iterator.remove();
366         }
367     }
368 }