001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.apr; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.net.SocketAddress; 025import java.nio.channels.spi.SelectorProvider; 026import java.util.Iterator; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.Executor; 030 031import org.apache.mina.core.RuntimeIoException; 032import org.apache.mina.core.polling.AbstractPollingIoAcceptor; 033import org.apache.mina.core.service.IoAcceptor; 034import org.apache.mina.core.service.IoProcessor; 035import org.apache.mina.core.service.IoService; 036import org.apache.mina.core.service.SimpleIoProcessorPool; 037import org.apache.mina.core.service.TransportMetadata; 038import org.apache.mina.transport.socket.DefaultSocketSessionConfig; 039import org.apache.tomcat.jni.Address; 040import org.apache.tomcat.jni.Poll; 041import org.apache.tomcat.jni.Pool; 042import org.apache.tomcat.jni.Socket; 043import org.apache.tomcat.jni.Status; 044 045/** 046 * {@link IoAcceptor} for APR based socket transport (TCP/IP). 047 * 048 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 049 */ 050public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> { 051 /** 052 * This constant is deduced from the APR code. It is used when the timeout 053 * has expired while doing a poll() operation. 054 */ 055 private static final int APR_TIMEUP_ERROR = -120001; 056 057 private static final int POLLSET_SIZE = 1024; 058 059 private final Object wakeupLock = new Object(); 060 061 private volatile long wakeupSocket; 062 063 private volatile boolean toBeWakenUp; 064 065 private volatile long pool; 066 067 private volatile long pollset; // socket poller 068 069 private final long[] polledSockets = new long[POLLSET_SIZE << 1]; 070 071 private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>(); 072 073 /** 074 * Constructor for {@link AprSocketAcceptor} using default parameters (multiple thread model). 075 */ 076 public AprSocketAcceptor() { 077 super(new DefaultSocketSessionConfig(), AprIoProcessor.class); 078 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 079 } 080 081 /** 082 * Constructor for {@link AprSocketAcceptor} using default parameters, and 083 * given number of {@link AprIoProcessor} for multithreading I/O operations. 084 * 085 * @param processorCount the number of processor to create and place in a 086 * {@link SimpleIoProcessorPool} 087 */ 088 public AprSocketAcceptor(int processorCount) { 089 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount); 090 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 091 } 092 093 /** 094 * Constructor for {@link AprSocketAcceptor} with default configuration but a 095 * specific {@link AprIoProcessor}, useful for sharing the same processor over multiple 096 * {@link IoService} of the same type. 097 * @param processor the processor to use for managing I/O events 098 */ 099 public AprSocketAcceptor(IoProcessor<AprSession> processor) { 100 super(new DefaultSocketSessionConfig(), processor); 101 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 102 } 103 104 /** 105 * Constructor for {@link AprSocketAcceptor} with a given {@link Executor} for handling 106 * connection events and a given {@link AprIoProcessor} for handling I/O events, useful for 107 * sharing the same processor and executor over multiple {@link IoService} of the same type. 108 * @param executor the executor for connection 109 * @param processor the processor for I/O operations 110 */ 111 public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) { 112 super(new DefaultSocketSessionConfig(), executor, processor); 113 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 114 } 115 116 /** 117 * {@inheritDoc} 118 */ 119 @Override 120 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception { 121 long s = Socket.accept(handle); 122 boolean success = false; 123 try { 124 AprSession result = new AprSocketSession(this, processor, s); 125 success = true; 126 return result; 127 } finally { 128 if (!success) { 129 Socket.close(s); 130 } 131 } 132 } 133 134 /** 135 * {@inheritDoc} 136 */ 137 @Override 138 protected Long open(SocketAddress localAddress) throws Exception { 139 InetSocketAddress la = (InetSocketAddress) localAddress; 140 long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool); 141 142 boolean success = false; 143 try { 144 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1); 145 if (result != Status.APR_SUCCESS) { 146 throwException(result); 147 } 148 result = Socket.timeoutSet(handle, 0); 149 if (result != Status.APR_SUCCESS) { 150 throwException(result); 151 } 152 153 // Configure the server socket, 154 result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0); 155 if (result != Status.APR_SUCCESS) { 156 throwException(result); 157 } 158 result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize()); 159 if (result != Status.APR_SUCCESS) { 160 throwException(result); 161 } 162 163 // and bind. 164 long sa; 165 if (la != null) { 166 if (la.getAddress() == null) { 167 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool); 168 } else { 169 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool); 170 } 171 } else { 172 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool); 173 } 174 175 result = Socket.bind(handle, sa); 176 if (result != Status.APR_SUCCESS) { 177 throwException(result); 178 } 179 result = Socket.listen(handle, getBacklog()); 180 if (result != Status.APR_SUCCESS) { 181 throwException(result); 182 } 183 184 result = Poll.add(pollset, handle, Poll.APR_POLLIN); 185 if (result != Status.APR_SUCCESS) { 186 throwException(result); 187 } 188 success = true; 189 } finally { 190 if (!success) { 191 close(handle); 192 } 193 } 194 return handle; 195 } 196 197 /** 198 * {@inheritDoc} 199 */ 200 @Override 201 protected void init() throws Exception { 202 // initialize a memory pool for APR functions 203 pool = Pool.create(AprLibrary.getInstance().getRootPool()); 204 205 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool); 206 207 pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 208 209 if (pollset <= 0) { 210 pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 211 } 212 213 if (pollset <= 0) { 214 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) { 215 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform."); 216 } 217 } 218 } 219 220 /** 221 * {@inheritDoc} 222 */ 223 @Override 224 protected void destroy() throws Exception { 225 if (wakeupSocket > 0) { 226 Socket.close(wakeupSocket); 227 } 228 if (pollset > 0) { 229 Poll.destroy(pollset); 230 } 231 if (pool > 0) { 232 Pool.destroy(pool); 233 } 234 } 235 236 /** 237 * {@inheritDoc} 238 */ 239 @Override 240 protected SocketAddress localAddress(Long handle) throws Exception { 241 long la = Address.get(Socket.APR_LOCAL, handle); 242 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port); 243 } 244 245 /** 246 * {@inheritDoc} 247 */ 248 @Override 249 protected int select() throws Exception { 250 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false); 251 if (rv <= 0) { 252 // We have had an error. It can simply be that we have reached 253 // the timeout (very unlikely, as we have set it to MAX_INTEGER) 254 if (rv != APR_TIMEUP_ERROR) { 255 // It's not a timeout being exceeded. Throw the error 256 throwException(rv); 257 } 258 259 rv = Poll.maintain(pollset, polledSockets, true); 260 if (rv > 0) { 261 for (int i = 0; i < rv; i++) { 262 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN); 263 } 264 } else if (rv < 0) { 265 throwException(rv); 266 } 267 268 return 0; 269 } else { 270 rv <<= 1; 271 if (!polledHandles.isEmpty()) { 272 polledHandles.clear(); 273 } 274 275 for (int i = 0; i < rv; i++) { 276 long flag = polledSockets[i]; 277 long socket = polledSockets[++i]; 278 if (socket == wakeupSocket) { 279 synchronized (wakeupLock) { 280 Poll.remove(pollset, wakeupSocket); 281 toBeWakenUp = false; 282 } 283 continue; 284 } 285 286 if ((flag & Poll.APR_POLLIN) != 0) { 287 polledHandles.add(socket); 288 } 289 } 290 return polledHandles.size(); 291 } 292 } 293 294 /** 295 * {@inheritDoc} 296 */ 297 @Override 298 protected Iterator<Long> selectedHandles() { 299 return polledHandles.iterator(); 300 } 301 302 /** 303 * {@inheritDoc} 304 */ 305 @Override 306 protected void close(Long handle) throws Exception { 307 Poll.remove(pollset, handle); 308 int result = Socket.close(handle); 309 if (result != Status.APR_SUCCESS) { 310 throwException(result); 311 } 312 } 313 314 /** 315 * {@inheritDoc} 316 */ 317 @Override 318 protected void wakeup() { 319 if (toBeWakenUp) { 320 return; 321 } 322 323 // Add a dummy socket to the pollset. 324 synchronized (wakeupLock) { 325 toBeWakenUp = true; 326 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT); 327 } 328 } 329 330 /** 331 * {@inheritDoc} 332 */ 333 @Override 334 public InetSocketAddress getLocalAddress() { 335 return (InetSocketAddress) super.getLocalAddress(); 336 } 337 338 /** 339 * {@inheritDoc} 340 */ 341 @Override 342 public InetSocketAddress getDefaultLocalAddress() { 343 return (InetSocketAddress) super.getDefaultLocalAddress(); 344 } 345 346 /** 347 * @see #setDefaultLocalAddress(SocketAddress) 348 * 349 * @param localAddress The localAddress to set 350 */ 351 public void setDefaultLocalAddress(InetSocketAddress localAddress) { 352 super.setDefaultLocalAddress(localAddress); 353 } 354 355 /** 356 * {@inheritDoc} 357 */ 358 public TransportMetadata getTransportMetadata() { 359 return AprSocketSession.METADATA; 360 } 361 362 /** 363 * Convert an APR code into an Exception with the corresponding message 364 * @param code error number 365 * @throws IOException the generated exception 366 */ 367 private void throwException(int code) throws IOException { 368 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")"); 369 } 370 371 @Override 372 protected void init(SelectorProvider selectorProvider) throws Exception { 373 init(); 374 } 375}