001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.apr; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.net.SocketAddress; 025import java.nio.ByteBuffer; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.Map; 030import java.util.Queue; 031import java.util.Set; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.Executor; 034 035import org.apache.mina.core.RuntimeIoException; 036import org.apache.mina.core.polling.AbstractPollingIoConnector; 037import org.apache.mina.core.service.IoConnector; 038import org.apache.mina.core.service.IoProcessor; 039import org.apache.mina.core.service.IoService; 040import org.apache.mina.core.service.SimpleIoProcessorPool; 041import org.apache.mina.core.service.TransportMetadata; 042import org.apache.mina.transport.socket.DefaultSocketSessionConfig; 043import org.apache.mina.transport.socket.SocketConnector; 044import org.apache.mina.transport.socket.SocketSessionConfig; 045import org.apache.tomcat.jni.Address; 046import org.apache.tomcat.jni.Poll; 047import org.apache.tomcat.jni.Pool; 048import org.apache.tomcat.jni.Socket; 049import org.apache.tomcat.jni.Status; 050 051/** 052 * {@link IoConnector} for APR based socket transport (TCP/IP). 053 * 054 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 055 */ 056public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector { 057 058 /** 059 * This constant is deduced from the APR code. It is used when the timeout 060 * has expired while doing a poll() operation. 061 */ 062 private static final int APR_TIMEUP_ERROR = -120001; 063 064 private static final int POLLSET_SIZE = 1024; 065 066 private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(POLLSET_SIZE); 067 068 private final Object wakeupLock = new Object(); 069 070 private volatile long wakeupSocket; 071 072 private volatile boolean toBeWakenUp; 073 074 private volatile long pool; 075 076 private volatile long pollset; // socket poller 077 078 private final long[] polledSockets = new long[POLLSET_SIZE << 1]; 079 080 private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>(); 081 082 private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE); 083 084 private volatile ByteBuffer dummyBuffer; 085 086 /** 087 * Create an {@link AprSocketConnector} with default configuration (multiple thread model). 088 */ 089 public AprSocketConnector() { 090 super(new DefaultSocketSessionConfig(), AprIoProcessor.class); 091 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 092 } 093 094 /** 095 * Constructor for {@link AprSocketConnector} with default configuration, and 096 * given number of {@link AprIoProcessor} for multithreading I/O operations 097 * @param processorCount the number of processor to create and place in a 098 * {@link SimpleIoProcessorPool} 099 */ 100 public AprSocketConnector(int processorCount) { 101 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount); 102 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 103 } 104 105 /** 106 * Constructor for {@link AprSocketConnector} with default configuration but a 107 * specific {@link IoProcessor}, useful for sharing the same processor over multiple 108 * {@link IoService} of the same type. 109 * @param processor the processor to use for managing I/O events 110 */ 111 public AprSocketConnector(IoProcessor<AprSession> processor) { 112 super(new DefaultSocketSessionConfig(), processor); 113 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 114 } 115 116 /** 117 * Constructor for {@link AprSocketConnector} with a given {@link Executor} for handling 118 * connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 119 * the same processor and executor over multiple {@link IoService} of the same type. 120 * @param executor the executor for connection 121 * @param processor the processor for I/O operations 122 */ 123 public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) { 124 super(new DefaultSocketSessionConfig(), executor, processor); 125 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 126 } 127 128 /** 129 * {@inheritDoc} 130 */ 131 @Override 132 protected void init() throws Exception { 133 // initialize a memory pool for APR functions 134 pool = Pool.create(AprLibrary.getInstance().getRootPool()); 135 136 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool); 137 138 dummyBuffer = Pool.alloc(pool, 1); 139 140 pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 141 142 if (pollset <= 0) { 143 pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 144 } 145 146 if (pollset <= 0) { 147 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) { 148 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform."); 149 } 150 } 151 } 152 153 /** 154 * {@inheritDoc} 155 */ 156 @Override 157 protected void destroy() throws Exception { 158 if (wakeupSocket > 0) { 159 Socket.close(wakeupSocket); 160 } 161 if (pollset > 0) { 162 Poll.destroy(pollset); 163 } 164 if (pool > 0) { 165 Pool.destroy(pool); 166 } 167 } 168 169 /** 170 * {@inheritDoc} 171 */ 172 @Override 173 protected Iterator<Long> allHandles() { 174 return polledHandles.iterator(); 175 } 176 177 /** 178 * {@inheritDoc} 179 */ 180 @Override 181 protected boolean connect(Long handle, SocketAddress remoteAddress) throws Exception { 182 InetSocketAddress ra = (InetSocketAddress) remoteAddress; 183 long sa; 184 if (ra != null) { 185 if (ra.getAddress() == null) { 186 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool); 187 } else { 188 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool); 189 } 190 } else { 191 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool); 192 } 193 194 int rv = Socket.connect(handle, sa); 195 if (rv == Status.APR_SUCCESS) { 196 return true; 197 } 198 199 if (Status.APR_STATUS_IS_EINPROGRESS(rv)) { 200 return false; 201 } 202 203 throwException(rv); 204 throw new InternalError(); // This sentence will never be executed. 205 } 206 207 /** 208 * {@inheritDoc} 209 */ 210 @Override 211 protected ConnectionRequest getConnectionRequest(Long handle) { 212 return requests.get(handle); 213 } 214 215 /** 216 * {@inheritDoc} 217 */ 218 @Override 219 protected void close(Long handle) throws Exception { 220 finishConnect(handle); 221 int rv = Socket.close(handle); 222 if (rv != Status.APR_SUCCESS) { 223 throwException(rv); 224 } 225 } 226 227 /** 228 * {@inheritDoc} 229 */ 230 @Override 231 protected boolean finishConnect(Long handle) throws Exception { 232 Poll.remove(pollset, handle); 233 requests.remove(handle); 234 if (failedHandles.remove(handle)) { 235 int rv = Socket.recvb(handle, dummyBuffer, 0, 1); 236 throwException(rv); 237 throw new InternalError("Shouldn't reach here."); 238 } 239 return true; 240 } 241 242 /** 243 * {@inheritDoc} 244 */ 245 @Override 246 protected Long newHandle(SocketAddress localAddress) throws Exception { 247 long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool); 248 boolean success = false; 249 try { 250 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1); 251 if (result != Status.APR_SUCCESS) { 252 throwException(result); 253 } 254 result = Socket.timeoutSet(handle, 0); 255 if (result != Status.APR_SUCCESS) { 256 throwException(result); 257 } 258 259 if (localAddress != null) { 260 InetSocketAddress la = (InetSocketAddress) localAddress; 261 long sa; 262 263 if (la.getAddress() == null) { 264 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool); 265 } else { 266 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool); 267 } 268 269 result = Socket.bind(handle, sa); 270 if (result != Status.APR_SUCCESS) { 271 throwException(result); 272 } 273 } 274 275 success = true; 276 return handle; 277 } finally { 278 if (!success) { 279 int rv = Socket.close(handle); 280 if (rv != Status.APR_SUCCESS) { 281 throwException(rv); 282 } 283 } 284 } 285 } 286 287 /** 288 * {@inheritDoc} 289 */ 290 @Override 291 protected AprSession newSession(IoProcessor<AprSession> processor, Long handle) throws Exception { 292 return new AprSocketSession(this, processor, handle); 293 } 294 295 /** 296 * {@inheritDoc} 297 */ 298 @Override 299 protected void register(Long handle, ConnectionRequest request) throws Exception { 300 int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT); 301 if (rv != Status.APR_SUCCESS) { 302 throwException(rv); 303 } 304 305 requests.put(handle, request); 306 } 307 308 /** 309 * {@inheritDoc} 310 */ 311 @Override 312 protected int select(int timeout) throws Exception { 313 int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false); 314 if (rv <= 0) { 315 if (rv != APR_TIMEUP_ERROR) { 316 throwException(rv); 317 } 318 319 rv = Poll.maintain(pollset, polledSockets, true); 320 if (rv > 0) { 321 for (int i = 0; i < rv; i++) { 322 Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT); 323 } 324 } else if (rv < 0) { 325 throwException(rv); 326 } 327 328 return 0; 329 } else { 330 rv <<= 1; 331 if (!polledHandles.isEmpty()) { 332 polledHandles.clear(); 333 } 334 335 for (int i = 0; i < rv; i++) { 336 long flag = polledSockets[i]; 337 long socket = polledSockets[++i]; 338 if (socket == wakeupSocket) { 339 synchronized (wakeupLock) { 340 Poll.remove(pollset, wakeupSocket); 341 toBeWakenUp = false; 342 } 343 continue; 344 } 345 polledHandles.add(socket); 346 if ((flag & Poll.APR_POLLOUT) == 0) { 347 failedHandles.add(socket); 348 } 349 } 350 return polledHandles.size(); 351 } 352 } 353 354 /** 355 * {@inheritDoc} 356 */ 357 @Override 358 protected Iterator<Long> selectedHandles() { 359 return polledHandles.iterator(); 360 } 361 362 /** 363 * {@inheritDoc} 364 */ 365 @Override 366 protected void wakeup() { 367 if (toBeWakenUp) { 368 return; 369 } 370 371 // Add a dummy socket to the pollset. 372 synchronized (wakeupLock) { 373 toBeWakenUp = true; 374 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT); 375 } 376 } 377 378 /** 379 * {@inheritDoc} 380 */ 381 public TransportMetadata getTransportMetadata() { 382 return AprSocketSession.METADATA; 383 } 384 385 /** 386 * {@inheritDoc} 387 */ 388 public SocketSessionConfig getSessionConfig() { 389 return (SocketSessionConfig) sessionConfig; 390 } 391 392 /** 393 * {@inheritDoc} 394 */ 395 @Override 396 public InetSocketAddress getDefaultRemoteAddress() { 397 return (InetSocketAddress) super.getDefaultRemoteAddress(); 398 } 399 400 /** 401 * {@inheritDoc} 402 */ 403 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) { 404 super.setDefaultRemoteAddress(defaultRemoteAddress); 405 } 406 407 /** 408 * transform an APR error number in a more fancy exception 409 * @param code APR error code 410 * @throws IOException the produced exception for the given APR error number 411 */ 412 private void throwException(int code) throws IOException { 413 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")"); 414 } 415}