View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.ServerSocket;
25  import java.net.SocketAddress;
26  import java.net.StandardSocketOptions;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.nio.channels.ServerSocketChannel;
30  import java.nio.channels.SocketChannel;
31  import java.nio.channels.spi.SelectorProvider;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.concurrent.Executor;
35  
36  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
37  import org.apache.mina.core.service.IoAcceptor;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.SimpleIoProcessorPool;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
43  import org.apache.mina.transport.socket.SocketAcceptor;
44  import org.apache.mina.transport.socket.SocketSessionConfig;
45  
46  /**
47   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
48   * handles incoming TCP/IP based socket connections.
49   *
50   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
51   */
52  public class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
53  implements SocketAcceptor {
54  
55      protected volatile Selector selector;
56      protected volatile SelectorProvider selectorProvider = null;
57  
58      /**
59       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
60       */
61      public NioSocketAcceptor() {
62          super(new DefaultSocketSessionConfig(), NioProcessor.class);
63          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
64      }
65  
66      /**
67       * Constructor for {@link NioSocketAcceptor} using default parameters, and
68       * given number of {@link NioProcessor} for multithreading I/O operations.
69       * 
70       * @param processorCount the number of processor to create and place in a
71       * {@link SimpleIoProcessorPool}
72       */
73      public NioSocketAcceptor(int processorCount) {
74          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
75          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
76      }
77  
78      /**
79       *  Constructor for {@link NioSocketAcceptor} with default configuration but a
80       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
81       *  {@link IoService} of the same type.
82       * @param processor the processor to use for managing I/O events
83       */
84      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
85          super(new DefaultSocketSessionConfig(), processor);
86          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
87      }
88  
89      /**
90       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
91       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
92       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
93       * @param executor the executor for connection
94       * @param processor the processor for I/O operations
95       */
96      public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
97          super(new DefaultSocketSessionConfig(), executor, processor);
98          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
99      }
100 
101     /**
102      * Constructor for {@link NioSocketAcceptor} using default parameters, and
103      * given number of {@link NioProcessor} for multithreading I/O operations, and
104      * a custom SelectorProvider for NIO
105      *
106      * @param processorCount the number of processor to create and place in a
107      * @param selectorProvider teh SelectorProvider to use
108      * {@link SimpleIoProcessorPool}
109      */
110     public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
111         super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
112         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
113         this.selectorProvider = selectorProvider;
114     }
115 
116     /**
117      * {@inheritDoc}
118      */
119     @Override
120     protected void init() throws Exception {
121         selector = Selector.open();
122     }
123 
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     protected void init(SelectorProvider selectorProvider) throws Exception {
129         this.selectorProvider = selectorProvider;
130 
131         if (selectorProvider == null) {
132             selector = Selector.open();
133         } else {
134             selector = selectorProvider.openSelector();
135         }
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     @Override
142     protected void destroy() throws Exception {
143         if (selector != null) {
144             selector.close();
145         }
146     }
147 
148     /**
149      * {@inheritDoc}
150      */
151     public TransportMetadata getTransportMetadata() {
152         return NioSocketSession.METADATA;
153     }
154 
155     /**
156      * {@inheritDoc}
157      */
158     @Override
159     public InetSocketAddress getLocalAddress() {
160         return (InetSocketAddress) super.getLocalAddress();
161     }
162 
163     /**
164      * {@inheritDoc}
165      */
166     @Override
167     public InetSocketAddress getDefaultLocalAddress() {
168         return (InetSocketAddress) super.getDefaultLocalAddress();
169     }
170 
171     /**
172      * {@inheritDoc}
173      */
174     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
175         setDefaultLocalAddress((SocketAddress) localAddress);
176     }
177 
178     /**
179      * {@inheritDoc}
180      */
181     @Override
182     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
183 
184         SelectionKey key = null;
185 
186         if (handle != null) {
187             key = handle.keyFor(selector);
188         }
189 
190         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
191             return null;
192         }
193 
194         // accept the connection from the client
195         try {
196             SocketChannel ch = handle.accept();
197     
198             if (ch == null) {
199                 return null;
200             }
201 
202             return new NioSocketSession(this, processor, ch);
203         } catch (Throwable t) {
204             if(t.getMessage().equals("Too many open files")) {
205                 LOGGER.error("Error Calling Accept on Socket - Sleeping Acceptor Thread. Check the ulimit parameter", t);
206                 try {
207                     // Sleep 50 ms, so that the select does not spin like crazy doing nothing but eating CPU
208                     // This is typically what will happen if we don't have any more File handle on the server
209                     // Check the ulimit parameter
210                     // NOTE : this is a workaround, there is no way we can handle this exception in any smarter way...
211                     Thread.sleep(50L);
212                 } catch (InterruptedException ie) {
213                     // Nothing to do
214                 }
215             } else {
216                 throw t;
217             }
218 
219             // No session when we have met an exception
220             return null;
221         }
222     }
223 
224     /**
225      * {@inheritDoc}
226      */
227     @Override
228     protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
229         // Creates the listening ServerSocket
230 
231 	SocketSessionConfig config = this.getSessionConfig();
232 	
233         ServerSocketChannel channel = null;
234 
235         if (selectorProvider != null) {
236             channel = selectorProvider.openServerSocketChannel();
237         } else {
238             channel = ServerSocketChannel.open();
239         }
240 
241         boolean success = false;
242 
243         try {
244             // This is a non blocking socket channel
245             channel.configureBlocking(false);
246 
247             // Configure the server socket,
248             ServerSocket socket = channel.socket();
249 
250             // Set the reuseAddress flag accordingly with the setting
251             socket.setReuseAddress(isReuseAddress());
252             
253             // Set the SND BUFF
254 	    if (config.getSendBufferSize() != -1) {
255 		channel.setOption(StandardSocketOptions.SO_SNDBUF, config.getSendBufferSize());
256 	    }
257 
258 	    // Set the RCV BUFF
259 	    if (config.getReceiveBufferSize() != -1) {
260 		channel.setOption(StandardSocketOptions.SO_RCVBUF, config.getReceiveBufferSize());
261 	    }
262 
263             // and bind.
264             try {
265                 socket.bind(localAddress, getBacklog());
266             } catch (IOException ioe) {
267                 // Add some info regarding the address we try to bind to the
268                 // message
269                 String newMessage = "Error while binding on " + localAddress;
270                 Exception e = new IOException(newMessage, ioe);
271 
272                 // And close the channel
273                 channel.close();
274 
275                 throw e;
276             }
277 
278             // Register the channel within the selector for ACCEPT event
279             channel.register(selector, SelectionKey.OP_ACCEPT);
280             success = true;
281         } finally {
282             if (!success) {
283                 close(channel);
284             }
285         }
286         return channel;
287     }
288 
289     /**
290      * {@inheritDoc}
291      */
292     @Override
293     protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
294         return handle.socket().getLocalSocketAddress();
295     }
296 
297     /**
298      * Check if we have at least one key whose corresponding channels is
299      * ready for I/O operations.
300      *
301      * This method performs a blocking selection operation.
302      * It returns only after at least one channel is selected,
303      * this selector's wakeup method is invoked, or the current thread
304      * is interrupted, whichever comes first.
305      * 
306      * @return The number of keys having their ready-operation set updated
307      * @throws IOException If an I/O error occurs
308      */
309     @Override
310     protected int select() throws Exception {
311         return selector.select();
312     }
313 
314     /**
315      * {@inheritDoc}
316      */
317     @Override
318     protected Iterator<ServerSocketChannel> selectedHandles() {
319         return new ServerSocketChannelIterator(selector.selectedKeys());
320     }
321 
322     /**
323      * {@inheritDoc}
324      */
325     @Override
326     protected void close(ServerSocketChannel handle) throws Exception {
327         SelectionKey key = handle.keyFor(selector);
328 
329         if (key != null) {
330             key.cancel();
331         }
332 
333         handle.close();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     @Override
340     protected void wakeup() {
341         selector.wakeup();
342     }
343 
344     /**
345      * Defines an iterator for the selected-key Set returned by the
346      * selector.selectedKeys(). It replaces the SelectionKey operator.
347      */
348     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
349         /** The selected-key iterator */
350         private final Iterator<SelectionKey> iterator;
351 
352         /**
353          * Build a SocketChannel iterator which will return a SocketChannel instead of
354          * a SelectionKey.
355          * 
356          * @param selectedKeys The selector selected-key set
357          */
358         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
359             iterator = selectedKeys.iterator();
360         }
361 
362         /**
363          * Tells if there are more SockectChannel left in the iterator
364          * @return <tt>true</tt> if there is at least one more
365          * SockectChannel object to read
366          */
367         public boolean hasNext() {
368             return iterator.hasNext();
369         }
370 
371         /**
372          * Get the next SocketChannel in the operator we have built from
373          * the selected-key et for this selector.
374          * 
375          * @return The next SocketChannel in the iterator
376          */
377         public ServerSocketChannel next() {
378             SelectionKey key = iterator.next();
379 
380             if (key.isValid() && key.isAcceptable()) {
381                 return (ServerSocketChannel) key.channel();
382             }
383 
384             return null;
385         }
386 
387         /**
388          * Remove the current SocketChannel from the iterator
389          */
390         public void remove() {
391             iterator.remove();
392         }
393     }
394 }