View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.nio.channels.spi.SelectorProvider;
30  import java.util.Iterator;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.locks.ReadWriteLock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  import org.apache.mina.core.RuntimeIoException;
37  import org.apache.mina.core.buffer.IoBuffer;
38  import org.apache.mina.core.file.FileRegion;
39  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
40  import org.apache.mina.core.session.SessionState;
41  
42  /**
43   * A processor for incoming and outgoing data get and written on a TCP socket.
44   *
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   */
47  public class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
48      /** The selector associated with this processor */
49      protected Selector selector;
50      
51      /** A lock used to protect concurent access to the selector */
52      protected ReadWriteLock selectorLock = new ReentrantReadWriteLock();
53  
54      protected SelectorProvider selectorProvider = null;
55  
56      /**
57       *
58       * Creates a new instance of NioProcessor.
59       *
60       * @param executor The executor to use
61       */
62      public NioProcessor(Executor executor) {
63          super(executor);
64  
65          try {
66              // Open a new selector
67              selector = Selector.open();
68          } catch (IOException e) {
69              throw new RuntimeIoException("Failed to open a selector.", e);
70          }
71      }
72  
73      /**
74       *
75       * Creates a new instance of NioProcessor.
76       *
77       * @param executor The executor to use
78       * @param selectorProvider The Selector provider to use
79       */
80      public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
81          super(executor);
82  
83          try {
84              // Open a new selector
85              if (selectorProvider == null) {
86                  selector = Selector.open();
87              } else {
88                  this.selectorProvider = selectorProvider;
89                  selector = selectorProvider.openSelector();
90              }
91          } catch (IOException e) {
92              throw new RuntimeIoException("Failed to open a selector.", e);
93          }
94      }
95  
96      @Override
97      protected void doDispose() throws Exception {
98          selectorLock.readLock().lock();
99          
100         try {
101             selector.close();
102         } finally {
103             selectorLock.readLock().unlock();
104         }
105     }
106 
107     @Override
108     protected int select(long timeout) throws Exception {
109         selectorLock.readLock().lock();
110         
111         try {
112             return selector.select(timeout);
113         } finally {
114             selectorLock.readLock().unlock();
115         }
116     }
117 
118     @Override
119     protected int select() throws Exception {
120         selectorLock.readLock().lock();
121         
122         try {
123             return selector.select();
124         } finally {
125             selectorLock.readLock().unlock();
126         }
127     }
128 
129     @Override
130     protected boolean isSelectorEmpty() {
131         selectorLock.readLock().lock();
132         
133         try {
134             return selector.keys().isEmpty();
135         } finally {
136             selectorLock.readLock().unlock();
137         }
138     }
139 
140     @Override
141     protected void wakeup() {
142         wakeupCalled.getAndSet(true);
143         selectorLock.readLock().lock();
144         
145         try {
146             selector.wakeup();
147         } finally {
148             selectorLock.readLock().unlock();
149         }
150     }
151 
152     @Override
153     protected Iterator<NioSession> allSessions() {
154         selectorLock.readLock().lock();
155         
156         try {
157             return new IoSessionIterator(selector.keys());
158         } finally {
159             selectorLock.readLock().unlock();
160         }
161     }
162     
163     @Override
164     protected int allSessionsCount()
165     {
166         return selector.keys().size();
167     }
168 
169     @SuppressWarnings("synthetic-access")
170     @Override
171     protected Iterator<NioSession> selectedSessions() {
172         return new IoSessionIterator(selector.selectedKeys());
173     }
174 
175     @Override
176     protected void init(NioSession session) throws Exception {
177         SelectableChannel ch = (SelectableChannel) session.getChannel();
178         ch.configureBlocking(false);
179         selectorLock.readLock().lock();
180         
181         try {
182             session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
183         } finally {
184             selectorLock.readLock().unlock();
185         }
186     }
187 
188     @Override
189     protected void destroy(NioSession session) throws Exception {
190         ByteChannel ch = session.getChannel();
191         
192         SelectionKey key = session.getSelectionKey();
193         
194         if (key != null) {
195             key.cancel();
196         }
197         
198         if ( ch.isOpen() ) {
199             ch.close();
200         }
201     }
202 
203     /**
204      * In the case we are using the java select() method, this method is used to
205      * trash the buggy selector and create a new one, registering all the
206      * sockets on it.
207      */
208     @Override
209     protected void registerNewSelector() throws IOException {
210         selectorLock.writeLock().lock();
211         
212         try {
213             Set<SelectionKey> keys = selector.keys();
214             Selector newSelector;
215 
216             // Open a new selector
217             if (selectorProvider == null) {
218                 newSelector = Selector.open();
219             } else {
220                 newSelector = selectorProvider.openSelector();
221             }
222 
223             // Loop on all the registered keys, and register them on the new selector
224             for (SelectionKey key : keys) {
225                 SelectableChannel ch = key.channel();
226 
227                 // Don't forget to attache the session, and back !
228                 NioSession/../../../../org/apache/mina/transport/socket/nio/NioSession.html#NioSession">NioSession session = (NioSession) key.attachment();
229                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
230                 session.setSelectionKey(newKey);
231             }
232 
233             // Now we can close the old selector and switch it
234             selector.close();
235             selector = newSelector;
236         } finally {
237             selectorLock.writeLock().unlock();
238         }
239 
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     @Override
246     protected boolean isBrokenConnection() throws IOException {
247         // A flag set to true if we find a broken session
248         boolean brokenSession = false;
249 
250         selectorLock.readLock().lock();
251         
252         try {
253             // Get the selector keys
254             Set<SelectionKey> keys = selector.keys();
255 
256             // Loop on all the keys to see if one of them
257             // has a closed channel
258             for (SelectionKey key : keys) {
259                 SelectableChannel channel = key.channel();
260 
261                 if (((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected())
262                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
263                     // The channel is not connected anymore. Cancel
264                     // the associated key then.
265                     key.cancel();
266 
267                     // Set the flag to true to avoid a selector switch
268                     brokenSession = true;
269                 }
270             }
271         } finally {
272             selectorLock.readLock().unlock();
273         }
274 
275         return brokenSession;
276     }
277 
278     /**
279      * {@inheritDoc}
280      */
281     @Override
282     protected SessionState getState(NioSession session) {
283         SelectionKey key = session.getSelectionKey();
284 
285         if (key == null) {
286             // The channel is not yet regisetred to a selector
287             return SessionState.OPENING;
288         }
289 
290         if (key.isValid()) {
291             // The session is opened
292             return SessionState.OPENED;
293         } else {
294             // The session still as to be closed
295             return SessionState.CLOSING;
296         }
297     }
298 
299     @Override
300     protected boolean isReadable(NioSession session) {
301         SelectionKey key = session.getSelectionKey();
302 
303         return (key != null) && key.isValid() && key.isReadable();
304     }
305 
306     @Override
307     protected boolean isWritable(NioSession session) {
308         SelectionKey key = session.getSelectionKey();
309 
310         return (key != null) && key.isValid() && key.isWritable();
311     }
312 
313     @Override
314     protected boolean isInterestedInRead(NioSession session) {
315         SelectionKey key = session.getSelectionKey();
316 
317         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
318     }
319 
320     @Override
321     protected boolean isInterestedInWrite(NioSession session) {
322         SelectionKey key = session.getSelectionKey();
323 
324         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
325     }
326 
327     /**
328      * {@inheritDoc}
329      */
330     @Override
331     protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
332         SelectionKey key = session.getSelectionKey();
333 
334         if ((key == null) || !key.isValid()) {
335             return;
336         }
337 
338         int oldInterestOps = key.interestOps();
339         int newInterestOps = oldInterestOps;
340 
341         if (isInterested) {
342             newInterestOps |= SelectionKey.OP_READ;
343         } else {
344             newInterestOps &= ~SelectionKey.OP_READ;
345         }
346 
347         if (oldInterestOps != newInterestOps) {
348             key.interestOps(newInterestOps);
349         }
350     }
351 
352     /**
353      * {@inheritDoc}
354      */
355     @Override
356     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
357         SelectionKey key = session.getSelectionKey();
358 
359         if ((key == null) || !key.isValid()) {
360             return;
361         }
362 
363         int newInterestOps = key.interestOps();
364 
365         if (isInterested) {
366             newInterestOps |= SelectionKey.OP_WRITE;
367         } else {
368             newInterestOps &= ~SelectionKey.OP_WRITE;
369         }
370 
371         key.interestOps(newInterestOps);
372     }
373 
374     @Override
375     protected int read(NioSession session, IoBuffer buf) throws Exception {
376         ByteChannel channel = session.getChannel();
377 
378         return channel.read(buf.buf());
379     }
380 
381     @Override
382     protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
383         if (buf.remaining() <= length) {
384             return session.getChannel().write(buf.buf());
385         }
386 
387         int oldLimit = buf.limit();
388         buf.limit(buf.position() + length);
389         try {
390             return session.getChannel().write(buf.buf());
391         } finally {
392             buf.limit(oldLimit);
393         }
394     }
395 
396     @Override
397     protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
398         try {
399             return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
400         } catch (IOException e) {
401             // Check to see if the IOException is being thrown due to
402             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
403             String message = e.getMessage();
404             if ((message != null) && message.contains("temporarily unavailable")) {
405                 return 0;
406             }
407 
408             throw e;
409         }
410     }
411 
412     /**
413      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
414      * the {@link Selector#keys()} iterator;
415      */
416     protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
417         private final Iterator<SelectionKey> iterator;
418 
419         /**
420          * Create this iterator as a wrapper on top of the selectionKey Set.
421          *
422          * @param keys
423          *            The set of selected sessions
424          */
425         private IoSessionIterator(Set<SelectionKey> keys) {
426             iterator = keys.iterator();
427         }
428 
429         /**
430          * {@inheritDoc}
431          */
432         @Override
433         public boolean hasNext() {
434             return iterator.hasNext();
435         }
436 
437         /**
438          * {@inheritDoc}
439          */
440         @Override
441         public NioSession next() {
442             SelectionKey key = iterator.next();
443             
444             return (NioSession) key.attachment();
445         }
446 
447         /**
448          * {@inheritDoc}
449          */
450         @Override
451         public void remove() {
452             iterator.remove();
453         }
454     }
455 }