001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.nio;
021
022import java.io.IOException;
023import java.nio.channels.ByteChannel;
024import java.nio.channels.DatagramChannel;
025import java.nio.channels.SelectableChannel;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.Selector;
028import java.nio.channels.SocketChannel;
029import java.nio.channels.spi.SelectorProvider;
030import java.util.Iterator;
031import java.util.Set;
032import java.util.concurrent.Executor;
033
034import org.apache.mina.core.RuntimeIoException;
035import org.apache.mina.core.buffer.IoBuffer;
036import org.apache.mina.core.file.FileRegion;
037import org.apache.mina.core.polling.AbstractPollingIoProcessor;
038import org.apache.mina.core.session.SessionState;
039
040/**
041 * TODO Add documentation
042 *
043 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
044 */
045public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
046    /** The selector associated with this processor */
047    private Selector selector;
048
049    private SelectorProvider selectorProvider = null;
050
051    /**
052     *
053     * Creates a new instance of NioProcessor.
054     *
055     * @param executor The executor to use
056     */
057    public NioProcessor(Executor executor) {
058        super(executor);
059
060        try {
061            // Open a new selector
062            selector = Selector.open();
063        } catch (IOException e) {
064            throw new RuntimeIoException("Failed to open a selector.", e);
065        }
066    }
067
068    /**
069     *
070     * Creates a new instance of NioProcessor.
071     *
072     * @param executor The executor to use
073     * @param selectorProvider The Selector provider to use
074     */
075    public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
076        super(executor);
077
078        try {
079            // Open a new selector
080            if (selectorProvider == null) {
081                selector = Selector.open();
082            } else {
083                selector = selectorProvider.openSelector();
084            }
085
086        } catch (IOException e) {
087            throw new RuntimeIoException("Failed to open a selector.", e);
088        }
089    }
090
091    @Override
092    protected void doDispose() throws Exception {
093        selector.close();
094    }
095
096    @Override
097    protected int select(long timeout) throws Exception {
098        return selector.select(timeout);
099    }
100
101    @Override
102    protected int select() throws Exception {
103        return selector.select();
104    }
105
106    @Override
107    protected boolean isSelectorEmpty() {
108        return selector.keys().isEmpty();
109    }
110
111    @Override
112    protected void wakeup() {
113        wakeupCalled.getAndSet(true);
114        selector.wakeup();
115    }
116
117    @Override
118    protected Iterator<NioSession> allSessions() {
119        return new IoSessionIterator(selector.keys());
120    }
121
122    @SuppressWarnings("synthetic-access")
123    @Override
124    protected Iterator<NioSession> selectedSessions() {
125        return new IoSessionIterator(selector.selectedKeys());
126    }
127
128    @Override
129    protected void init(NioSession session) throws Exception {
130        SelectableChannel ch = (SelectableChannel) session.getChannel();
131        ch.configureBlocking(false);
132        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
133    }
134
135    @Override
136    protected void destroy(NioSession session) throws Exception {
137        ByteChannel ch = session.getChannel();
138        SelectionKey key = session.getSelectionKey();
139        if (key != null) {
140            key.cancel();
141        }
142        ch.close();
143    }
144
145    /**
146     * In the case we are using the java select() method, this method is used to
147     * trash the buggy selector and create a new one, registering all the
148     * sockets on it.
149     */
150    @Override
151    protected void registerNewSelector() throws IOException {
152        synchronized (selector) {
153            Set<SelectionKey> keys = selector.keys();
154
155            // Open a new selector
156            Selector newSelector = null;
157
158            if (selectorProvider == null) {
159                newSelector = Selector.open();
160            } else {
161                newSelector = selectorProvider.openSelector();
162            }
163
164            // Loop on all the registered keys, and register them on the new selector
165            for (SelectionKey key : keys) {
166                SelectableChannel ch = key.channel();
167
168                // Don't forget to attache the session, and back !
169                NioSession session = (NioSession) key.attachment();
170                SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
171                session.setSelectionKey(newKey);
172            }
173
174            // Now we can close the old selector and switch it
175            selector.close();
176            selector = newSelector;
177        }
178    }
179
180    /**
181     * {@inheritDoc}
182     */
183    @Override
184    protected boolean isBrokenConnection() throws IOException {
185        // A flag set to true if we find a broken session
186        boolean brokenSession = false;
187
188        synchronized (selector) {
189            // Get the selector keys
190            Set<SelectionKey> keys = selector.keys();
191
192            // Loop on all the keys to see if one of them
193            // has a closed channel
194            for (SelectionKey key : keys) {
195                SelectableChannel channel = key.channel();
196
197                if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
198                        || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
199                    // The channel is not connected anymore. Cancel
200                    // the associated key then.
201                    key.cancel();
202
203                    // Set the flag to true to avoid a selector switch
204                    brokenSession = true;
205                }
206            }
207        }
208
209        return brokenSession;
210    }
211
212    /**
213     * {@inheritDoc}
214     */
215    @Override
216    protected SessionState getState(NioSession session) {
217        SelectionKey key = session.getSelectionKey();
218
219        if (key == null) {
220            // The channel is not yet registred to a selector
221            return SessionState.OPENING;
222        }
223
224        if (key.isValid()) {
225            // The session is opened
226            return SessionState.OPENED;
227        } else {
228            // The session still as to be closed
229            return SessionState.CLOSING;
230        }
231    }
232
233    @Override
234    protected boolean isReadable(NioSession session) {
235        SelectionKey key = session.getSelectionKey();
236
237        return (key != null) && key.isValid() && key.isReadable();
238    }
239
240    @Override
241    protected boolean isWritable(NioSession session) {
242        SelectionKey key = session.getSelectionKey();
243
244        return (key != null) && key.isValid() && key.isWritable();
245    }
246
247    @Override
248    protected boolean isInterestedInRead(NioSession session) {
249        SelectionKey key = session.getSelectionKey();
250
251        return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
252    }
253
254    @Override
255    protected boolean isInterestedInWrite(NioSession session) {
256        SelectionKey key = session.getSelectionKey();
257
258        return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
259    }
260
261    /**
262     * {@inheritDoc}
263     */
264    @Override
265    protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
266        SelectionKey key = session.getSelectionKey();
267
268        if ((key == null) || !key.isValid()) {
269            return;
270        }
271
272        int oldInterestOps = key.interestOps();
273        int newInterestOps = oldInterestOps;
274
275        if (isInterested) {
276            newInterestOps |= SelectionKey.OP_READ;
277        } else {
278            newInterestOps &= ~SelectionKey.OP_READ;
279        }
280
281        if (oldInterestOps != newInterestOps) {
282            key.interestOps(newInterestOps);
283        }
284    }
285
286    /**
287     * {@inheritDoc}
288     */
289    @Override
290    protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
291        SelectionKey key = session.getSelectionKey();
292
293        if ((key == null) || !key.isValid()) {
294            return;
295        }
296
297        int newInterestOps = key.interestOps();
298
299        if (isInterested) {
300            newInterestOps |= SelectionKey.OP_WRITE;
301        } else {
302            newInterestOps &= ~SelectionKey.OP_WRITE;
303        }
304
305        key.interestOps(newInterestOps);
306    }
307
308    @Override
309    protected int read(NioSession session, IoBuffer buf) throws Exception {
310        ByteChannel channel = session.getChannel();
311
312        return channel.read(buf.buf());
313    }
314
315    @Override
316    protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
317        if (buf.remaining() <= length) {
318            return session.getChannel().write(buf.buf());
319        }
320
321        int oldLimit = buf.limit();
322        buf.limit(buf.position() + length);
323        try {
324            return session.getChannel().write(buf.buf());
325        } finally {
326            buf.limit(oldLimit);
327        }
328    }
329
330    @Override
331    protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
332        try {
333            return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
334        } catch (IOException e) {
335            // Check to see if the IOException is being thrown due to
336            // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
337            String message = e.getMessage();
338            if ((message != null) && message.contains("temporarily unavailable")) {
339                return 0;
340            }
341
342            throw e;
343        }
344    }
345
346    /**
347     * An encapsulating iterator around the {@link Selector#selectedKeys()} or
348     * the {@link Selector#keys()} iterator;
349     */
350    protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
351        private final Iterator<SelectionKey> iterator;
352
353        /**
354         * Create this iterator as a wrapper on top of the selectionKey Set.
355         *
356         * @param keys
357         *            The set of selected sessions
358         */
359        private IoSessionIterator(Set<SelectionKey> keys) {
360            iterator = keys.iterator();
361        }
362
363        /**
364         * {@inheritDoc}
365         */
366        public boolean hasNext() {
367            return iterator.hasNext();
368        }
369
370        /**
371         * {@inheritDoc}
372         */
373        public NioSession next() {
374            SelectionKey key = iterator.next();
375            NioSession nioSession = (NioSession) key.attachment();
376            return nioSession;
377        }
378
379        /**
380         * {@inheritDoc}
381         */
382        public void remove() {
383            iterator.remove();
384        }
385    }
386}