View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.Iterator;
30  import java.util.Set;
31  import java.util.concurrent.Executor;
32  
33  import org.apache.mina.core.RuntimeIoException;
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.FileRegion;
36  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
37  import org.apache.mina.core.session.SessionState;
38  
39  /**
40   * TODO Add documentation
41   *
42   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
43   */
44  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
45      /** The selector associated with this processor */
46      private Selector selector;
47  
48      /**
49       *
50       * Creates a new instance of NioProcessor.
51       *
52       * @param executor
53       */
54      public NioProcessor(Executor executor) {
55          super(executor);
56  
57          try {
58              // Open a new selector
59              selector = Selector.open();
60          } catch (IOException e) {
61              throw new RuntimeIoException("Failed to open a selector.", e);
62          }
63      }
64  
65      @Override
66      protected void doDispose() throws Exception {
67          selector.close();
68      }
69  
70      @Override
71      protected int select(long timeout) throws Exception {
72          return selector.select(timeout);
73      }
74  
75      @Override
76      protected int select() throws Exception {
77          return selector.select();
78      }
79  
80      @Override
81      protected boolean isSelectorEmpty() {
82          return selector.keys().isEmpty();
83      }
84  
85      @Override
86      protected void wakeup() {
87          wakeupCalled.getAndSet(true);
88          selector.wakeup();
89      }
90  
91      @Override
92      protected Iterator<NioSession> allSessions() {
93          return new IoSessionIterator(selector.keys());
94      }
95  
96      @SuppressWarnings("synthetic-access")
97      @Override
98      protected Iterator<NioSession> selectedSessions() {
99          return new IoSessionIterator(selector.selectedKeys());
100     }
101 
102     @Override
103     protected void init(NioSession session) throws Exception {
104         SelectableChannel ch = (SelectableChannel) session.getChannel();
105         ch.configureBlocking(false);
106         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
107                 session));
108     }
109 
110     @Override
111     protected void destroy(NioSession session) throws Exception {
112         ByteChannel ch = session.getChannel();
113         SelectionKey key = session.getSelectionKey();
114         if (key != null) {
115             key.cancel();
116         }
117         ch.close();
118     }
119 
120 
121     /**
122      * In the case we are using the java select() method, this method is used to
123      * trash the buggy selector and create a new one, registering all the
124      * sockets on it.
125      */
126     @Override
127     protected void registerNewSelector() throws IOException {
128         synchronized (selector) {
129             Set<SelectionKey> keys = selector.keys();
130 
131             // Open a new selector
132             Selector newSelector = Selector.open();
133 
134             // Loop on all the registered keys, and register them on the new selector
135             for (SelectionKey key : keys) {
136                 SelectableChannel ch = key.channel();
137 
138                 // Don't forget to attache the session, and back !
139                 NioSession session = (NioSession)key.attachment();
140                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
141                 session.setSelectionKey( newKey );
142             }
143 
144             // Now we can close the old selector and switch it
145             selector.close();
146             selector = newSelector;
147         }
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     protected boolean isBrokenConnection() throws IOException {
155         // A flag set to true if we find a broken session
156         boolean brokenSession = false;
157 
158         synchronized (selector) {
159             // Get the selector keys
160             Set<SelectionKey> keys = selector.keys();
161 
162             // Loop on all the keys to see if one of them
163             // has a closed channel
164             for (SelectionKey key : keys) {
165                 SelectableChannel channel = key.channel();
166 
167                 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel)
168                         .isConnected()))
169                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel)
170                                 .isConnected())) {
171                     // The channel is not connected anymore. Cancel
172                     // the associated key then.
173                     key.cancel();
174 
175                     // Set the flag to true to avoid a selector switch
176                     brokenSession = true;
177                 }
178             }
179         }
180 
181         return brokenSession;
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     @Override
188     protected SessionState getState(NioSession session) {
189         SelectionKey key = session.getSelectionKey();
190 
191         if (key == null) {
192             // The channel is not yet registred to a selector
193             return SessionState.OPENING;
194         }
195 
196         if (key.isValid()) {
197             // The session is opened
198             return SessionState.OPENED;
199         } else {
200             // The session still as to be closed
201             return SessionState.CLOSING;
202         }
203     }
204 
205     @Override
206     protected boolean isReadable(NioSession session) {
207         SelectionKey key = session.getSelectionKey();
208         return key.isValid() && key.isReadable();
209     }
210 
211     @Override
212     protected boolean isWritable(NioSession session) {
213         SelectionKey key = session.getSelectionKey();
214         return key.isValid() && key.isWritable();
215     }
216 
217     @Override
218     protected boolean isInterestedInRead(NioSession session) {
219         SelectionKey key = session.getSelectionKey();
220         return key.isValid() && ( (key.interestOps() & SelectionKey.OP_READ) != 0 );
221     }
222 
223     @Override
224     protected boolean isInterestedInWrite(NioSession session) {
225         SelectionKey key = session.getSelectionKey();
226         return key.isValid()
227                 && ( (key.interestOps() & SelectionKey.OP_WRITE) != 0 );
228     }
229 
230     /**
231      * {@inheritDoc}
232      */
233     @Override
234     protected void setInterestedInRead(NioSession session, boolean isInterested)
235             throws Exception {
236         SelectionKey key = session.getSelectionKey();
237         int oldInterestOps = key.interestOps();
238         int newInterestOps = oldInterestOps;
239 
240         if (isInterested) {
241             newInterestOps |= SelectionKey.OP_READ;
242         } else {
243             newInterestOps &= ~SelectionKey.OP_READ;
244         }
245 
246         if (oldInterestOps != newInterestOps) {
247             key.interestOps(newInterestOps);
248         }
249     }
250 
251     /**
252      * {@inheritDoc}
253      */
254     @Override
255     protected void setInterestedInWrite(NioSession session, boolean isInterested)
256             throws Exception {
257         SelectionKey key = session.getSelectionKey();
258 
259         if (key == null) {
260             return;
261         }
262 
263         int newInterestOps = key.interestOps();
264 
265         if (isInterested) {
266             newInterestOps |= SelectionKey.OP_WRITE;
267             //newInterestOps &= ~SelectionKey.OP_READ;
268         } else {
269             newInterestOps &= ~SelectionKey.OP_WRITE;
270             //newInterestOps |= SelectionKey.OP_READ;
271         }
272 
273         key.interestOps(newInterestOps);
274     }
275 
276     @Override
277     protected int read(NioSession session, IoBuffer buf) throws Exception {
278         ByteChannel channel = session.getChannel();
279 
280         return channel.read(buf.buf());
281     }
282 
283     @Override
284     protected int write(NioSession session, IoBuffer buf, int length)
285             throws Exception {
286         if (buf.remaining() <= length) {
287             return session.getChannel().write(buf.buf());
288         }
289 
290         int oldLimit = buf.limit();
291         buf.limit(buf.position() + length);
292         try {
293             return session.getChannel().write(buf.buf());
294         } finally {
295             buf.limit(oldLimit);
296         }
297     }
298 
299     @Override
300     protected int transferFile(NioSession session, FileRegion region, int length)
301             throws Exception {
302         try {
303             return (int) region.getFileChannel().transferTo(
304                     region.getPosition(), length, session.getChannel());
305         } catch (IOException e) {
306             // Check to see if the IOException is being thrown due to
307             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
308             String message = e.getMessage();
309             if (( message != null ) && message.contains("temporarily unavailable")) {
310                 return 0;
311             }
312 
313             throw e;
314         }
315     }
316 
317     /**
318      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
319      * the {@link Selector#keys()} iterator;
320      */
321     protected static class IoSessionIterator<NioSession> implements
322             Iterator<NioSession> {
323         private final Iterator<SelectionKey> iterator;
324 
325         /**
326          * Create this iterator as a wrapper on top of the selectionKey Set.
327          *
328          * @param keys
329          *            The set of selected sessions
330          */
331         private IoSessionIterator(Set<SelectionKey> keys) {
332             iterator = keys.iterator();
333         }
334 
335         /**
336          * {@inheritDoc}
337          */
338         public boolean hasNext() {
339             return iterator.hasNext();
340         }
341 
342         /**
343          * {@inheritDoc}
344          */
345         public NioSession next() {
346             SelectionKey key = iterator.next();
347             NioSession nioSession = (NioSession) key.attachment();
348             return nioSession;
349         }
350 
351         /**
352          * {@inheritDoc}
353          */
354         public void remove() {
355             iterator.remove();
356         }
357     }
358 }