001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.nio; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.net.SocketAddress; 025import java.nio.channels.SelectionKey; 026import java.nio.channels.Selector; 027import java.nio.channels.SocketChannel; 028import java.util.Collection; 029import java.util.Iterator; 030import java.util.concurrent.Executor; 031 032import org.apache.mina.core.polling.AbstractPollingIoConnector; 033import org.apache.mina.core.service.IoConnector; 034import org.apache.mina.core.service.IoProcessor; 035import org.apache.mina.core.service.IoService; 036import org.apache.mina.core.service.SimpleIoProcessorPool; 037import org.apache.mina.core.service.TransportMetadata; 038import org.apache.mina.transport.socket.DefaultSocketSessionConfig; 039import org.apache.mina.transport.socket.SocketConnector; 040import org.apache.mina.transport.socket.SocketSessionConfig; 041 042/** 043 * {@link IoConnector} for socket transport (TCP/IP). 044 * 045 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 046 */ 047public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements 048SocketConnector { 049 050 private volatile Selector selector; 051 052 /** 053 * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model). 054 */ 055 public NioSocketConnector() { 056 super(new DefaultSocketSessionConfig(), NioProcessor.class); 057 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 058 } 059 060 /** 061 * Constructor for {@link NioSocketConnector} with default configuration, and 062 * given number of {@link NioProcessor} for multithreading I/O operations 063 * @param processorCount the number of processor to create and place in a 064 * {@link SimpleIoProcessorPool} 065 */ 066 public NioSocketConnector(int processorCount) { 067 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount); 068 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 069 } 070 071 /** 072 * Constructor for {@link NioSocketConnector} with default configuration but a 073 * specific {@link IoProcessor}, useful for sharing the same processor over multiple 074 * {@link IoService} of the same type. 075 * @param processor the processor to use for managing I/O events 076 */ 077 public NioSocketConnector(IoProcessor<NioSession> processor) { 078 super(new DefaultSocketSessionConfig(), processor); 079 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 080 } 081 082 /** 083 * Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling 084 * connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 085 * the same processor and executor over multiple {@link IoService} of the same type. 086 * @param executor the executor for connection 087 * @param processor the processor for I/O operations 088 */ 089 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) { 090 super(new DefaultSocketSessionConfig(), executor, processor); 091 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 092 } 093 094 /** 095 * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in 096 * thread pool executor to manage the given number of processor instances. The processor class must have 097 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 098 * no-arg constructor. 099 * 100 * @param processorClass the processor class. 101 * @param processorCount the number of processors to instantiate. 102 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider) 103 * @since 2.0.0-M4 104 */ 105 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) { 106 super(new DefaultSocketSessionConfig(), processorClass, processorCount); 107 } 108 109 /** 110 * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in 111 * thread pool executor to manage the default number of processor instances. The processor class must have 112 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a 113 * no-arg constructor. The default number of instances is equal to the number of processor cores 114 * in the system, plus one. 115 * 116 * @param processorClass the processor class. 117 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider) 118 * @see org.apache.mina.core.service.SimpleIoProcessorPool#DEFAULT_SIZE 119 * @since 2.0.0-M4 120 */ 121 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) { 122 super(new DefaultSocketSessionConfig(), processorClass); 123 } 124 125 /** 126 * {@inheritDoc} 127 */ 128 @Override 129 protected void init() throws Exception { 130 this.selector = Selector.open(); 131 } 132 133 /** 134 * {@inheritDoc} 135 */ 136 @Override 137 protected void destroy() throws Exception { 138 if (selector != null) { 139 selector.close(); 140 } 141 } 142 143 /** 144 * {@inheritDoc} 145 */ 146 public TransportMetadata getTransportMetadata() { 147 return NioSocketSession.METADATA; 148 } 149 150 /** 151 * {@inheritDoc} 152 */ 153 public SocketSessionConfig getSessionConfig() { 154 return (SocketSessionConfig) sessionConfig; 155 } 156 157 /** 158 * {@inheritDoc} 159 */ 160 @Override 161 public InetSocketAddress getDefaultRemoteAddress() { 162 return (InetSocketAddress) super.getDefaultRemoteAddress(); 163 } 164 165 /** 166 * {@inheritDoc} 167 */ 168 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) { 169 super.setDefaultRemoteAddress(defaultRemoteAddress); 170 } 171 172 /** 173 * {@inheritDoc} 174 */ 175 @Override 176 protected Iterator<SocketChannel> allHandles() { 177 return new SocketChannelIterator(selector.keys()); 178 } 179 180 /** 181 * {@inheritDoc} 182 */ 183 @Override 184 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception { 185 return handle.connect(remoteAddress); 186 } 187 188 /** 189 * {@inheritDoc} 190 */ 191 @Override 192 protected ConnectionRequest getConnectionRequest(SocketChannel handle) { 193 SelectionKey key = handle.keyFor(selector); 194 195 if ((key == null) || (!key.isValid())) { 196 return null; 197 } 198 199 return (ConnectionRequest) key.attachment(); 200 } 201 202 /** 203 * {@inheritDoc} 204 */ 205 @Override 206 protected void close(SocketChannel handle) throws Exception { 207 SelectionKey key = handle.keyFor(selector); 208 209 if (key != null) { 210 key.cancel(); 211 } 212 213 handle.close(); 214 } 215 216 /** 217 * {@inheritDoc} 218 */ 219 @Override 220 protected boolean finishConnect(SocketChannel handle) throws Exception { 221 if (handle.finishConnect()) { 222 SelectionKey key = handle.keyFor(selector); 223 224 if (key != null) { 225 key.cancel(); 226 } 227 228 return true; 229 } 230 231 return false; 232 } 233 234 /** 235 * {@inheritDoc} 236 */ 237 @Override 238 protected SocketChannel newHandle(SocketAddress localAddress) throws Exception { 239 SocketChannel ch = SocketChannel.open(); 240 241 int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize(); 242 243 if (receiveBufferSize > 65535) { 244 ch.socket().setReceiveBufferSize(receiveBufferSize); 245 } 246 247 if (localAddress != null) { 248 try { 249 ch.socket().bind(localAddress); 250 } catch (IOException ioe) { 251 // Add some info regarding the address we try to bind to the 252 // message 253 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " 254 + ioe.getMessage(); 255 Exception e = new IOException(newMessage); 256 e.initCause(ioe.getCause()); 257 258 // Preemptively close the channel 259 ch.close(); 260 throw e; 261 } 262 } 263 264 ch.configureBlocking(false); 265 266 return ch; 267 } 268 269 /** 270 * {@inheritDoc} 271 */ 272 @Override 273 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) { 274 return new NioSocketSession(this, processor, handle); 275 } 276 277 /** 278 * {@inheritDoc} 279 */ 280 @Override 281 protected void register(SocketChannel handle, ConnectionRequest request) throws Exception { 282 handle.register(selector, SelectionKey.OP_CONNECT, request); 283 } 284 285 /** 286 * {@inheritDoc} 287 */ 288 @Override 289 protected int select(int timeout) throws Exception { 290 return selector.select(timeout); 291 } 292 293 /** 294 * {@inheritDoc} 295 */ 296 @Override 297 protected Iterator<SocketChannel> selectedHandles() { 298 return new SocketChannelIterator(selector.selectedKeys()); 299 } 300 301 /** 302 * {@inheritDoc} 303 */ 304 @Override 305 protected void wakeup() { 306 selector.wakeup(); 307 } 308 309 private static class SocketChannelIterator implements Iterator<SocketChannel> { 310 311 private final Iterator<SelectionKey> i; 312 313 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) { 314 this.i = selectedKeys.iterator(); 315 } 316 317 /** 318 * {@inheritDoc} 319 */ 320 public boolean hasNext() { 321 return i.hasNext(); 322 } 323 324 /** 325 * {@inheritDoc} 326 */ 327 public SocketChannel next() { 328 SelectionKey key = i.next(); 329 return (SocketChannel) key.channel(); 330 } 331 332 /** 333 * {@inheritDoc} 334 */ 335 public void remove() { 336 i.remove(); 337 } 338 } 339}