001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.nio; 021 022import java.io.IOException; 023import java.nio.channels.ByteChannel; 024import java.nio.channels.DatagramChannel; 025import java.nio.channels.SelectableChannel; 026import java.nio.channels.SelectionKey; 027import java.nio.channels.Selector; 028import java.nio.channels.SocketChannel; 029import java.nio.channels.spi.SelectorProvider; 030import java.util.Iterator; 031import java.util.Set; 032import java.util.concurrent.Executor; 033 034import org.apache.mina.core.RuntimeIoException; 035import org.apache.mina.core.buffer.IoBuffer; 036import org.apache.mina.core.file.FileRegion; 037import org.apache.mina.core.polling.AbstractPollingIoProcessor; 038import org.apache.mina.core.session.SessionState; 039 040/** 041 * TODO Add documentation 042 * 043 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 044 */ 045public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> { 046 /** The selector associated with this processor */ 047 private Selector selector; 048 049 private SelectorProvider selectorProvider = null; 050 051 /** 052 * 053 * Creates a new instance of NioProcessor. 054 * 055 * @param executor The executor to use 056 */ 057 public NioProcessor(Executor executor) { 058 super(executor); 059 060 try { 061 // Open a new selector 062 selector = Selector.open(); 063 } catch (IOException e) { 064 throw new RuntimeIoException("Failed to open a selector.", e); 065 } 066 } 067 068 /** 069 * 070 * Creates a new instance of NioProcessor. 071 * 072 * @param executor The executor to use 073 * @param selectorProvider The Selector provider to use 074 */ 075 public NioProcessor(Executor executor, SelectorProvider selectorProvider) { 076 super(executor); 077 078 try { 079 // Open a new selector 080 if (selectorProvider == null) { 081 selector = Selector.open(); 082 } else { 083 selector = selectorProvider.openSelector(); 084 } 085 086 } catch (IOException e) { 087 throw new RuntimeIoException("Failed to open a selector.", e); 088 } 089 } 090 091 @Override 092 protected void doDispose() throws Exception { 093 selector.close(); 094 } 095 096 @Override 097 protected int select(long timeout) throws Exception { 098 return selector.select(timeout); 099 } 100 101 @Override 102 protected int select() throws Exception { 103 return selector.select(); 104 } 105 106 @Override 107 protected boolean isSelectorEmpty() { 108 return selector.keys().isEmpty(); 109 } 110 111 @Override 112 protected void wakeup() { 113 wakeupCalled.getAndSet(true); 114 selector.wakeup(); 115 } 116 117 @Override 118 protected Iterator<NioSession> allSessions() { 119 return new IoSessionIterator(selector.keys()); 120 } 121 122 @SuppressWarnings("synthetic-access") 123 @Override 124 protected Iterator<NioSession> selectedSessions() { 125 return new IoSessionIterator(selector.selectedKeys()); 126 } 127 128 @Override 129 protected void init(NioSession session) throws Exception { 130 SelectableChannel ch = (SelectableChannel) session.getChannel(); 131 ch.configureBlocking(false); 132 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); 133 } 134 135 @Override 136 protected void destroy(NioSession session) throws Exception { 137 ByteChannel ch = session.getChannel(); 138 SelectionKey key = session.getSelectionKey(); 139 if (key != null) { 140 key.cancel(); 141 } 142 ch.close(); 143 } 144 145 /** 146 * In the case we are using the java select() method, this method is used to 147 * trash the buggy selector and create a new one, registering all the 148 * sockets on it. 149 */ 150 @Override 151 protected void registerNewSelector() throws IOException { 152 synchronized (selector) { 153 Set<SelectionKey> keys = selector.keys(); 154 155 // Open a new selector 156 Selector newSelector = null; 157 158 if (selectorProvider == null) { 159 newSelector = Selector.open(); 160 } else { 161 newSelector = selectorProvider.openSelector(); 162 } 163 164 // Loop on all the registered keys, and register them on the new selector 165 for (SelectionKey key : keys) { 166 SelectableChannel ch = key.channel(); 167 168 // Don't forget to attache the session, and back ! 169 NioSession session = (NioSession) key.attachment(); 170 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session); 171 session.setSelectionKey(newKey); 172 } 173 174 // Now we can close the old selector and switch it 175 selector.close(); 176 selector = newSelector; 177 } 178 } 179 180 /** 181 * {@inheritDoc} 182 */ 183 @Override 184 protected boolean isBrokenConnection() throws IOException { 185 // A flag set to true if we find a broken session 186 boolean brokenSession = false; 187 188 synchronized (selector) { 189 // Get the selector keys 190 Set<SelectionKey> keys = selector.keys(); 191 192 // Loop on all the keys to see if one of them 193 // has a closed channel 194 for (SelectionKey key : keys) { 195 SelectableChannel channel = key.channel(); 196 197 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected())) 198 || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) { 199 // The channel is not connected anymore. Cancel 200 // the associated key then. 201 key.cancel(); 202 203 // Set the flag to true to avoid a selector switch 204 brokenSession = true; 205 } 206 } 207 } 208 209 return brokenSession; 210 } 211 212 /** 213 * {@inheritDoc} 214 */ 215 @Override 216 protected SessionState getState(NioSession session) { 217 SelectionKey key = session.getSelectionKey(); 218 219 if (key == null) { 220 // The channel is not yet registred to a selector 221 return SessionState.OPENING; 222 } 223 224 if (key.isValid()) { 225 // The session is opened 226 return SessionState.OPENED; 227 } else { 228 // The session still as to be closed 229 return SessionState.CLOSING; 230 } 231 } 232 233 @Override 234 protected boolean isReadable(NioSession session) { 235 SelectionKey key = session.getSelectionKey(); 236 237 return (key != null) && key.isValid() && key.isReadable(); 238 } 239 240 @Override 241 protected boolean isWritable(NioSession session) { 242 SelectionKey key = session.getSelectionKey(); 243 244 return (key != null) && key.isValid() && key.isWritable(); 245 } 246 247 @Override 248 protected boolean isInterestedInRead(NioSession session) { 249 SelectionKey key = session.getSelectionKey(); 250 251 return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0); 252 } 253 254 @Override 255 protected boolean isInterestedInWrite(NioSession session) { 256 SelectionKey key = session.getSelectionKey(); 257 258 return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0); 259 } 260 261 /** 262 * {@inheritDoc} 263 */ 264 @Override 265 protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception { 266 SelectionKey key = session.getSelectionKey(); 267 268 if ((key == null) || !key.isValid()) { 269 return; 270 } 271 272 int oldInterestOps = key.interestOps(); 273 int newInterestOps = oldInterestOps; 274 275 if (isInterested) { 276 newInterestOps |= SelectionKey.OP_READ; 277 } else { 278 newInterestOps &= ~SelectionKey.OP_READ; 279 } 280 281 if (oldInterestOps != newInterestOps) { 282 key.interestOps(newInterestOps); 283 } 284 } 285 286 /** 287 * {@inheritDoc} 288 */ 289 @Override 290 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception { 291 SelectionKey key = session.getSelectionKey(); 292 293 if ((key == null) || !key.isValid()) { 294 return; 295 } 296 297 int newInterestOps = key.interestOps(); 298 299 if (isInterested) { 300 newInterestOps |= SelectionKey.OP_WRITE; 301 } else { 302 newInterestOps &= ~SelectionKey.OP_WRITE; 303 } 304 305 key.interestOps(newInterestOps); 306 } 307 308 @Override 309 protected int read(NioSession session, IoBuffer buf) throws Exception { 310 ByteChannel channel = session.getChannel(); 311 312 return channel.read(buf.buf()); 313 } 314 315 @Override 316 protected int write(NioSession session, IoBuffer buf, int length) throws Exception { 317 if (buf.remaining() <= length) { 318 return session.getChannel().write(buf.buf()); 319 } 320 321 int oldLimit = buf.limit(); 322 buf.limit(buf.position() + length); 323 try { 324 return session.getChannel().write(buf.buf()); 325 } finally { 326 buf.limit(oldLimit); 327 } 328 } 329 330 @Override 331 protected int transferFile(NioSession session, FileRegion region, int length) throws Exception { 332 try { 333 return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel()); 334 } catch (IOException e) { 335 // Check to see if the IOException is being thrown due to 336 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988 337 String message = e.getMessage(); 338 if ((message != null) && message.contains("temporarily unavailable")) { 339 return 0; 340 } 341 342 throw e; 343 } 344 } 345 346 /** 347 * An encapsulating iterator around the {@link Selector#selectedKeys()} or 348 * the {@link Selector#keys()} iterator; 349 */ 350 protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> { 351 private final Iterator<SelectionKey> iterator; 352 353 /** 354 * Create this iterator as a wrapper on top of the selectionKey Set. 355 * 356 * @param keys 357 * The set of selected sessions 358 */ 359 private IoSessionIterator(Set<SelectionKey> keys) { 360 iterator = keys.iterator(); 361 } 362 363 /** 364 * {@inheritDoc} 365 */ 366 public boolean hasNext() { 367 return iterator.hasNext(); 368 } 369 370 /** 371 * {@inheritDoc} 372 */ 373 public NioSession next() { 374 SelectionKey key = iterator.next(); 375 NioSession nioSession = (NioSession) key.attachment(); 376 return nioSession; 377 } 378 379 /** 380 * {@inheritDoc} 381 */ 382 public void remove() { 383 iterator.remove(); 384 } 385 } 386}