sessionShutdownCallback) {
super();
this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
this.exceptionHandler = exceptionHandler;
this.sessionShutdownCallback = sessionShutdownCallback;
this.shutdownInitiated = new AtomicBoolean(false);
this.closedSessions = new ConcurrentLinkedQueue<>();
this.pendingSessions = new ConcurrentLinkedQueue<>();
try {
this.selector = Selector.open();
} catch (final IOException ex) {
throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
}
this.shutdownMutex = new Object();
this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
}
@Override
public IOReactorStatus getStatus() {
return this.status.get();
}
private void closeQuietly(final Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (final IOException ignore) {
}
}
}
private void cancelQuietly(final Cancellable cancellable) {
if (cancellable != null) {
cancellable.cancel();
}
}
void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
Args.notNull(socketChannel, "SocketChannel");
this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
this.selector.wakeup();
}
/**
* Activates the I/O reactor. The I/O reactor will start reacting to I/O
* events and and dispatch I/O event notifications to the {@link IOEventHandler}
* associated with the given I/O session.
*
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor.
*
* The method will remain blocked unto the I/O reactor is shut down or the
* execution thread is interrupted.
*
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
@Override
public void execute() throws InterruptedIOException, IOReactorException {
if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
doExecute();
}
}
private void doExecute() throws InterruptedIOException, IOReactorException {
final long selectTimeout = this.reactorConfig.getSelectInterval();
try {
while (!Thread.currentThread().isInterrupted()) {
final int readyCount;
try {
readyCount = this.selector.select(selectTimeout);
} catch (final InterruptedIOException ex) {
throw ex;
} catch (final IOException ex) {
throw new IOReactorException("Unexpected selector failure", ex);
}
if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
if (this.shutdownInitiated.compareAndSet(false, true)) {
initiateSessionShutdown();
}
closePendingSessions();
}
if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
break;
}
// Process selected I/O events
if (readyCount > 0) {
processEvents(this.selector.selectedKeys());
}
validateActiveChannels();
// Process closed sessions
processClosedSessions();
// If active process new channels
if (this.status.get().compareTo(IOReactorStatus.ACTIVE) == 0) {
processPendingSessions();
}
// Exit select loop if graceful shutdown has been completed
if (this.status.get().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0
&& this.selector.keys().isEmpty()) {
this.status.set(IOReactorStatus.SHUT_DOWN);
}
if (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
break;
}
}
} catch (final ClosedSelectorException ignore) {
} finally {
try {
closePendingSessions();
closeActiveChannels();
processClosedSessions();
} finally {
this.status.set(IOReactorStatus.SHUT_DOWN);
synchronized (this.shutdownMutex) {
this.shutdownMutex.notifyAll();
}
}
}
}
private void initiateSessionShutdown() {
if (this.sessionShutdownCallback != null) {
final Set keys = this.selector.keys();
for (final SelectionKey key : keys) {
final ManagedIOSession session = (ManagedIOSession) key.attachment();
if (session != null) {
this.sessionShutdownCallback.execute(session);
}
}
}
}
private void validateActiveChannels() {
final long currentTime = System.currentTimeMillis();
if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval()) {
this.lastTimeoutCheck = currentTime;
for (final SelectionKey key : this.selector.keys()) {
timeoutCheck(key, currentTime);
}
}
}
private void processEvents(final Set selectedKeys) {
for (final SelectionKey key : selectedKeys) {
processEvent(key);
}
selectedKeys.clear();
}
private void handleRuntimeException(final RuntimeException ex) {
if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
throw ex;
}
}
private void processEvent(final SelectionKey key) {
final ManagedIOSession session = (ManagedIOSession) key.attachment();
try {
if (key.isReadable()) {
session.updateAccessTime();
session.onInputReady();
}
if (key.isWritable()) {
session.updateAccessTime();
session.onOutputReady();
}
} catch (final CancelledKeyException ex) {
session.shutdown();
} catch (final RuntimeException ex) {
session.shutdown();
handleRuntimeException(ex);
}
}
private void processPendingSessions() throws IOReactorException {
PendingSession pendingSession;
while ((pendingSession = this.pendingSessions.poll()) != null) {
final ManagedIOSession session;
try {
final SocketChannel socketChannel = pendingSession.socketChannel;
final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
socketChannel.configureBlocking(false);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_READ);
session = new ManagedIOSession(
sessionRequest != null ? sessionRequest.getRemoteEndpoint() : null,
new IOSessionImpl(key, socketChannel),
closedSessions);
session.setHandler(this.eventHandlerFactory.createHandler(session));
session.setSocketTimeout(this.reactorConfig.getSoTimeout());
key.attach(session);
} catch (final ClosedChannelException ex) {
final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
if (sessionRequest != null) {
sessionRequest.failed(ex);
}
return;
} catch (final IOException ex) {
throw new IOReactorException("Failure registering channel with the selector", ex);
}
try {
final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
if (sessionRequest != null) {
sessionRequest.completed(session);
}
try {
session.onConnected();
} catch (final RuntimeException ex) {
handleRuntimeException(ex);
}
} catch (final CancelledKeyException ex) {
session.shutdown();
}
}
}
private void processClosedSessions() {
for (;;) {
final ManagedIOSession session = this.closedSessions.poll();
if (session == null) {
break;
}
try {
session.onDisconnected();
} catch (final CancelledKeyException ex) {
// ignore and move on
} catch (final RuntimeException ex) {
handleRuntimeException(ex);
}
}
}
private void timeoutCheck(final SelectionKey key, final long now) {
final ManagedIOSession session = (ManagedIOSession) key.attachment();
if (session != null) {
try {
final int timeout = session.getSocketTimeout();
if (timeout > 0) {
if (session.getLastAccessTime() + timeout < now) {
session.onTimeout();
}
}
} catch (final CancelledKeyException ex) {
session.shutdown();
} catch (final RuntimeException ex) {
session.shutdown();
handleRuntimeException(ex);
}
}
}
private void closePendingSessions() {
for (;;) {
final PendingSession pendingSession = this.pendingSessions.poll();
if (pendingSession == null) {
break;
} else {
cancelQuietly(pendingSession.sessionRequest);
closeQuietly(pendingSession.socketChannel);
}
}
}
private void closeActiveChannels() {
final Set keys = this.selector.keys();
for (final SelectionKey key : keys) {
final ManagedIOSession session = (ManagedIOSession) key.attachment();
closeQuietly(session);
}
closeQuietly(this.selector);
}
@Override
public void awaitShutdown(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
Args.notNull(timeUnit, "Time unit");
final long timeoutMs = timeUnit.toMillis(timeout);
final long deadline = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
synchronized (this.shutdownMutex) {
while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
this.shutdownMutex.wait(remaining);
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
return;
}
}
}
}
@Override
public void initiateShutdown() {
if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
selector.wakeup();
}
}
void forceShutdown() {
this.status.set(IOReactorStatus.SHUT_DOWN);
this.selector.wakeup();
}
@Override
public void shutdown(final long graceTime, final TimeUnit timeUnit) {
initiateShutdown();
try {
awaitShutdown(graceTime, timeUnit);
forceShutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static class PendingSession {
final SocketChannel socketChannel;
final SessionRequestImpl sessionRequest;
private PendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
this.socketChannel = socketChannel;
this.sessionRequest = sessionRequest;
}
}
}