001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.apr; 021 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.Executor; 030 031import org.apache.mina.core.RuntimeIoException; 032import org.apache.mina.core.buffer.IoBuffer; 033import org.apache.mina.core.file.FileRegion; 034import org.apache.mina.core.polling.AbstractPollingIoProcessor; 035import org.apache.mina.core.session.SessionState; 036import org.apache.tomcat.jni.File; 037import org.apache.tomcat.jni.Poll; 038import org.apache.tomcat.jni.Pool; 039import org.apache.tomcat.jni.Socket; 040import org.apache.tomcat.jni.Status; 041 042/** 043 * The class in charge of processing socket level IO events for the 044 * {@link AprSocketConnector} 045 * 046 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 047 */ 048public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> { 049 private static final int POLLSET_SIZE = 1024; 050 051 private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE); 052 053 private final Object wakeupLock = new Object(); 054 055 private final long wakeupSocket; 056 057 private volatile boolean toBeWakenUp; 058 059 private final long pool; 060 061 private final long bufferPool; // memory pool 062 063 private final long pollset; // socket poller 064 065 private final long[] polledSockets = new long[POLLSET_SIZE << 1]; 066 067 private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>(); 068 069 /** 070 * Create a new instance of {@link AprIoProcessor} with a given Exector for 071 * handling I/Os events. 072 * 073 * @param executor 074 * the {@link Executor} for handling I/O events 075 */ 076 public AprIoProcessor(Executor executor) { 077 super(executor); 078 079 // initialize a memory pool for APR functions 080 pool = Pool.create(AprLibrary.getInstance().getRootPool()); 081 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool()); 082 083 try { 084 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool); 085 } catch (RuntimeException e) { 086 throw e; 087 } catch (Error e) { 088 throw e; 089 } catch (Exception e) { 090 throw new RuntimeIoException("Failed to create a wakeup socket.", e); 091 } 092 093 boolean success = false; 094 long newPollset; 095 try { 096 newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 097 098 if (newPollset == 0) { 099 newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE); 100 } 101 102 pollset = newPollset; 103 if (pollset < 0) { 104 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) { 105 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform."); 106 } 107 } 108 success = true; 109 } catch (RuntimeException e) { 110 throw e; 111 } catch (Error e) { 112 throw e; 113 } catch (Exception e) { 114 throw new RuntimeIoException("Failed to create a pollset.", e); 115 } finally { 116 if (!success) { 117 dispose(); 118 } 119 } 120 } 121 122 /** 123 * {@inheritDoc} 124 */ 125 @Override 126 protected void doDispose() { 127 Poll.destroy(pollset); 128 Socket.close(wakeupSocket); 129 Pool.destroy(bufferPool); 130 Pool.destroy(pool); 131 } 132 133 /** 134 * {@inheritDoc} 135 */ 136 @Override 137 protected int select() throws Exception { 138 return select(Integer.MAX_VALUE); 139 } 140 141 /** 142 * {@inheritDoc} 143 */ 144 @Override 145 protected int select(long timeout) throws Exception { 146 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false); 147 if (rv <= 0) { 148 if (rv != -120001) { 149 throwException(rv); 150 } 151 152 rv = Poll.maintain(pollset, polledSockets, true); 153 if (rv > 0) { 154 for (int i = 0; i < rv; i++) { 155 long socket = polledSockets[i]; 156 AprSession session = allSessions.get(socket); 157 if (session == null) { 158 continue; 159 } 160 161 int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) 162 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0); 163 164 Poll.add(pollset, socket, flag); 165 } 166 } else if (rv < 0) { 167 throwException(rv); 168 } 169 170 return 0; 171 } else { 172 rv <<= 1; 173 if (!polledSessions.isEmpty()) { 174 polledSessions.clear(); 175 } 176 for (int i = 0; i < rv; i++) { 177 long flag = polledSockets[i]; 178 long socket = polledSockets[++i]; 179 if (socket == wakeupSocket) { 180 synchronized (wakeupLock) { 181 Poll.remove(pollset, wakeupSocket); 182 toBeWakenUp = false; 183 wakeupCalled.set(true); 184 } 185 continue; 186 } 187 AprSession session = allSessions.get(socket); 188 if (session == null) { 189 continue; 190 } 191 192 session.setReadable((flag & Poll.APR_POLLIN) != 0); 193 session.setWritable((flag & Poll.APR_POLLOUT) != 0); 194 195 polledSessions.add(session); 196 } 197 198 return polledSessions.size(); 199 } 200 } 201 202 /** 203 * {@inheritDoc} 204 */ 205 @Override 206 protected boolean isSelectorEmpty() { 207 return allSessions.isEmpty(); 208 } 209 210 /** 211 * {@inheritDoc} 212 */ 213 @Override 214 protected void wakeup() { 215 if (toBeWakenUp) { 216 return; 217 } 218 219 // Add a dummy socket to the pollset. 220 synchronized (wakeupLock) { 221 toBeWakenUp = true; 222 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT); 223 } 224 } 225 226 /** 227 * {@inheritDoc} 228 */ 229 @Override 230 protected Iterator<AprSession> allSessions() { 231 return allSessions.values().iterator(); 232 } 233 234 /** 235 * {@inheritDoc} 236 */ 237 @Override 238 protected Iterator<AprSession> selectedSessions() { 239 return polledSessions.iterator(); 240 } 241 242 @Override 243 protected void init(AprSession session) throws Exception { 244 long s = session.getDescriptor(); 245 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1); 246 Socket.timeoutSet(s, 0); 247 248 int rv = Poll.add(pollset, s, Poll.APR_POLLIN); 249 if (rv != Status.APR_SUCCESS) { 250 throwException(rv); 251 } 252 253 session.setInterestedInRead(true); 254 allSessions.put(s, session); 255 } 256 257 /** 258 * {@inheritDoc} 259 */ 260 @Override 261 protected void destroy(AprSession session) throws Exception { 262 if (allSessions.remove(session.getDescriptor()) == null) { 263 // Already destroyed. 264 return; 265 } 266 267 int ret = Poll.remove(pollset, session.getDescriptor()); 268 try { 269 if (ret != Status.APR_SUCCESS) { 270 throwException(ret); 271 } 272 } finally { 273 ret = Socket.close(session.getDescriptor()); 274 275 // destroying the session because it won't be reused 276 // after this point 277 Socket.destroy(session.getDescriptor()); 278 session.setDescriptor(0); 279 280 if (ret != Status.APR_SUCCESS) { 281 throwException(ret); 282 } 283 } 284 } 285 286 /** 287 * {@inheritDoc} 288 */ 289 @Override 290 protected SessionState getState(AprSession session) { 291 long socket = session.getDescriptor(); 292 293 if (socket != 0) { 294 return SessionState.OPENED; 295 } else if (allSessions.get(socket) != null) { 296 return SessionState.OPENING; // will occur ? 297 } else { 298 return SessionState.CLOSING; 299 } 300 } 301 302 /** 303 * {@inheritDoc} 304 */ 305 @Override 306 protected boolean isReadable(AprSession session) { 307 return session.isReadable(); 308 } 309 310 /** 311 * {@inheritDoc} 312 */ 313 @Override 314 protected boolean isWritable(AprSession session) { 315 return session.isWritable(); 316 } 317 318 /** 319 * {@inheritDoc} 320 */ 321 @Override 322 protected boolean isInterestedInRead(AprSession session) { 323 return session.isInterestedInRead(); 324 } 325 326 /** 327 * {@inheritDoc} 328 */ 329 @Override 330 protected boolean isInterestedInWrite(AprSession session) { 331 return session.isInterestedInWrite(); 332 } 333 334 /** 335 * {@inheritDoc} 336 */ 337 @Override 338 protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception { 339 if (session.isInterestedInRead() == isInterested) { 340 return; 341 } 342 343 int rv = Poll.remove(pollset, session.getDescriptor()); 344 345 if (rv != Status.APR_SUCCESS) { 346 throwException(rv); 347 } 348 349 int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0); 350 351 rv = Poll.add(pollset, session.getDescriptor(), flags); 352 353 if (rv == Status.APR_SUCCESS) { 354 session.setInterestedInRead(isInterested); 355 } else { 356 throwException(rv); 357 } 358 } 359 360 /** 361 * {@inheritDoc} 362 */ 363 @Override 364 protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception { 365 if (session.isInterestedInWrite() == isInterested) { 366 return; 367 } 368 369 int rv = Poll.remove(pollset, session.getDescriptor()); 370 371 if (rv != Status.APR_SUCCESS) { 372 throwException(rv); 373 } 374 375 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0); 376 377 rv = Poll.add(pollset, session.getDescriptor(), flags); 378 379 if (rv == Status.APR_SUCCESS) { 380 session.setInterestedInWrite(isInterested); 381 } else { 382 throwException(rv); 383 } 384 } 385 386 /** 387 * {@inheritDoc} 388 */ 389 @Override 390 protected int read(AprSession session, IoBuffer buffer) throws Exception { 391 int bytes; 392 int capacity = buffer.remaining(); 393 // Using Socket.recv() directly causes memory leak. :-( 394 ByteBuffer b = Pool.alloc(bufferPool, capacity); 395 396 try { 397 bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity); 398 399 if (bytes > 0) { 400 b.position(0); 401 b.limit(bytes); 402 buffer.put(b); 403 } else if (bytes < 0) { 404 if (Status.APR_STATUS_IS_EOF(-bytes)) { 405 bytes = -1; 406 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) { 407 bytes = 0; 408 } else { 409 throwException(bytes); 410 } 411 } 412 } finally { 413 Pool.clear(bufferPool); 414 } 415 416 return bytes; 417 } 418 419 /** 420 * {@inheritDoc} 421 */ 422 @Override 423 protected int write(AprSession session, IoBuffer buf, int length) throws IOException { 424 int writtenBytes; 425 if (buf.isDirect()) { 426 writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length); 427 } else { 428 writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length); 429 if (writtenBytes > 0) { 430 buf.skip(writtenBytes); 431 } 432 } 433 434 if (writtenBytes < 0) { 435 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) { 436 writtenBytes = 0; 437 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) { 438 writtenBytes = 0; 439 } else { 440 throwException(writtenBytes); 441 } 442 } 443 return writtenBytes; 444 } 445 446 /** 447 * {@inheritDoc} 448 */ 449 @Override 450 protected int transferFile(AprSession session, FileRegion region, int length) throws Exception { 451 if (region.getFilename() == null) { 452 throw new UnsupportedOperationException(); 453 } 454 455 long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED 456 | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor())); 457 long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0); 458 File.close(fd); 459 460 if (numWritten < 0) { 461 if (numWritten == -Status.EAGAIN) { 462 return 0; 463 } 464 throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten 465 + ")"); 466 } 467 return (int) numWritten; 468 } 469 470 private void throwException(int code) throws IOException { 471 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")"); 472 } 473 474 /** 475 * {@inheritDoc} 476 */ 477 @Override 478 protected void registerNewSelector() { 479 // Do nothing 480 } 481 482 /** 483 * {@inheritDoc} 484 */ 485 @Override 486 protected boolean isBrokenConnection() throws IOException { 487 // Here, we assume that this is the case. 488 return true; 489 } 490}