View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.Collection;
29  import java.util.Iterator;
30  import java.util.concurrent.Executor;
31  
32  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
33  import org.apache.mina.core.service.IoAcceptor;
34  import org.apache.mina.core.service.IoProcessor;
35  import org.apache.mina.core.service.SimpleIoProcessorPool;
36  import org.apache.mina.core.service.TransportMetadata;
37  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38  import org.apache.mina.transport.socket.SocketAcceptor;
39  import org.apache.mina.transport.socket.SocketSessionConfig;
40  
41  /**
42   * {@link IoAcceptor} for socket transport (TCP/IP).  This class
43   * handles incoming TCP/IP based socket connections.
44   *
45   * @author The Apache MINA Project (dev@mina.apache.org)
46   * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
47   */
48  public final class NioSocketAcceptor
49          extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
50          implements SocketAcceptor {
51  
52      private int backlog = 50;
53      private boolean reuseAddress = false;
54  
55      private volatile Selector selector;
56  
57      /**
58       * Constructor for {@link NioSocketAcceptor} using default parameters (multiple thread model).
59       */
60      public NioSocketAcceptor() {
61          super(new DefaultSocketSessionConfig(), NioProcessor.class);
62          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
63      }
64  
65      /**
66       * Constructor for {@link NioSocketAcceptor} using default parameters, and 
67       * given number of {@link NioProcessor} for multithreading I/O operations.
68       * 
69       * @param processorCount the number of processor to create and place in a
70       * {@link SimpleIoProcessorPool} 
71       */
72      public NioSocketAcceptor(int processorCount) {
73          super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
74          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
75      }
76  
77      /**
78      *  Constructor for {@link NioSocketAcceptor} with default configuration but a
79       *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
80       *  {@link IoService} of the same type.
81       * @param processor the processor to use for managing I/O events
82       */
83      public NioSocketAcceptor(IoProcessor<NioSession> processor) {
84          super(new DefaultSocketSessionConfig(), processor);
85          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86      }
87  
88      /**
89       *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling 
90       *  connection events and a given {@link IoProcessor} for handling I/O events, useful for 
91       *  sharing the same processor and executor over multiple {@link IoService} of the same type.
92       * @param executor the executor for connection
93       * @param processor the processor for I/O operations
94       */
95      public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
96          super(new DefaultSocketSessionConfig(), executor, processor);
97          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
98      }
99  
100     /**
101      * {@inheritDoc}
102      */
103     @Override
104     protected void init() throws Exception {
105         selector = Selector.open();
106     }
107     
108     /**
109      * {@inheritDoc}
110      */
111     @Override
112     protected void destroy() throws Exception {
113         if (selector != null) {
114             selector.close();
115         }
116     }
117 
118     /**
119      * {@inheritDoc}
120      */
121     public TransportMetadata getTransportMetadata() {
122         return NioSocketSession.METADATA;
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     @Override
129     public SocketSessionConfig getSessionConfig() {
130         return (SocketSessionConfig) super.getSessionConfig();
131     }
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     public InetSocketAddress getLocalAddress() {
138         return (InetSocketAddress) super.getLocalAddress();
139     }
140 
141     /**
142      * {@inheritDoc}
143      */
144     @Override
145     public InetSocketAddress getDefaultLocalAddress() {
146         return (InetSocketAddress) super.getDefaultLocalAddress();
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
153         setDefaultLocalAddress((SocketAddress) localAddress);
154     }
155 
156     /**
157      * {@inheritDoc}
158      */
159     public boolean isReuseAddress() {
160         return reuseAddress;
161     }
162 
163     /**
164      * {@inheritDoc}
165      */
166     public void setReuseAddress(boolean reuseAddress) {
167         synchronized (bindLock) {
168             if (isActive()) {
169                 throw new IllegalStateException(
170                         "reuseAddress can't be set while the acceptor is bound.");
171             }
172 
173             this.reuseAddress = reuseAddress;
174         }
175     }
176 
177     /**
178      * {@inheritDoc}
179      */
180     public int getBacklog() {
181         return backlog;
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     public void setBacklog(int backlog) {
188         synchronized (bindLock) {
189             if (isActive()) {
190                 throw new IllegalStateException(
191                         "backlog can't be set while the acceptor is bound.");
192             }
193 
194             this.backlog = backlog;
195         }
196     }
197 
198     /**
199      * {@inheritDoc}
200      */
201     @Override
202     protected NioSession accept(IoProcessor<NioSession> processor,
203             ServerSocketChannel handle) throws Exception {
204 
205         SelectionKey key = handle.keyFor(selector);
206         if (!key.isAcceptable()) {
207             return null;
208         }
209 
210         // accept the connection from the client
211         SocketChannel ch = handle.accept();
212         if (ch == null) {
213             return null;
214         }
215 
216         return new NioSocketSession(this, processor, ch);
217     }
218 
219     /**
220      * {@inheritDoc}
221      */
222     @Override
223     protected ServerSocketChannel open(SocketAddress localAddress)
224             throws Exception {
225         ServerSocketChannel c = ServerSocketChannel.open();
226         boolean success = false;
227         try {
228             c.configureBlocking(false);
229             // Configure the server socket,
230             c.socket().setReuseAddress(isReuseAddress());
231             // XXX: Do we need to provide this property? (I think we need to remove it.)
232             c.socket().setReceiveBufferSize(
233                     getSessionConfig().getReceiveBufferSize());
234             // and bind.
235             c.socket().bind(localAddress, getBacklog());
236             c.register(selector, SelectionKey.OP_ACCEPT);
237             success = true;
238         } finally {
239             if (!success) {
240                 close(c);
241             }
242         }
243         return c;
244     }
245 
246     /**
247      * {@inheritDoc}
248      */
249     @Override
250     protected SocketAddress localAddress(ServerSocketChannel handle)
251             throws Exception {
252         return handle.socket().getLocalSocketAddress();
253     }
254 
255     /**
256       * Check if we have at least one key whose corresponding channels is 
257       * ready for I/O operations.
258       *
259       * This method performs a blocking selection operation. 
260       * It returns only after at least one channel is selected, 
261       * this selector's wakeup method is invoked, or the current thread 
262       * is interrupted, whichever comes first.
263       * 
264       * @return <code>true</code> if one key has its ready-operation set updated
265       * @throws IOException If an I/O error occurs
266       * @throws ClosedSelectorException If this selector is closed 
267       */
268     @Override
269     protected boolean select() throws Exception {
270         return selector.select() > 0;
271     }
272 
273     /**
274      * {@inheritDoc}
275      */
276     @Override
277     protected Iterator<ServerSocketChannel> selectedHandles() {
278         return new ServerSocketChannelIterator(selector.selectedKeys());
279     }
280 
281     /**
282      * {@inheritDoc}
283      */
284     @Override
285     protected void close(ServerSocketChannel handle) throws Exception {
286         SelectionKey key = handle.keyFor(selector);
287         if (key != null) {
288             key.cancel();
289         }
290         handle.close();
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     protected void wakeup() {
298         selector.wakeup();
299     }
300 
301     private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
302 
303         private final Iterator<SelectionKey> i;
304 
305         private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
306             i = selectedKeys.iterator();
307         }
308 
309         /**
310          * {@inheritDoc}
311          */
312         public boolean hasNext() {
313             return i.hasNext();
314         }
315 
316         /**
317          * {@inheritDoc}
318          */
319         public ServerSocketChannel next() {
320             SelectionKey key = i.next();
321             return (ServerSocketChannel) key.channel();
322         }
323 
324         /**
325          * {@inheritDoc}
326          */
327         public void remove() {
328             i.remove();
329         }
330     }
331 }