/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
*
* Important note about exception handling *
* Protocol specific exceptions as well as those I/O exceptions thrown in the * course of interaction with the session's channel are to be expected are to be * dealt with by specific protocol handlers. These exceptions may result in * termination of an individual session but should not affect the I/O reactor * and all other active sessions. There are situations, however, when the I/O * reactor itself encounters an internal problem such as an I/O exception in * the underlying NIO classes or an unhandled runtime exception. Those types of * exceptions are usually fatal and will cause the I/O reactor to shut down * automatically. *
* There is a possibility to override this behavior and prevent I/O reactors * from shutting down automatically in case of a runtime exception or an I/O * exception in internal classes. This can be accomplished by providing a custom * implementation of the {@link IOReactorExceptionHandler} interface. *
* If an I/O reactor is unable to automatically recover from an I/O or a runtime
* exception it will enter the shutdown mode. First off, it cancel all pending
* new session requests. Then it will attempt to close all active I/O sessions
* gracefully giving them some time to flush pending output data and terminate
* cleanly. Lastly, it will forcibly shut down those I/O sessions that still
* remain active after the grace period. This is a fairly complex process, where
* many things can fail at the same time and many different exceptions can be
* thrown in the course of the shutdown process. The I/O reactor will record all
* exceptions thrown during the shutdown process, including the original one
* that actually caused the shutdown in the first place, in an audit log. One
* can obtain the audit log using {@link #getAuditLog()}, examine exceptions
* thrown by the I/O reactor prior and in the course of the reactor shutdown
* and decide whether it is safe to restart the I/O reactor.
*
* @since 4.0
*/
public abstract class AbstractMultiworkerIOReactor implements IOReactor {
protected final IOReactorConfig reactorConfig;
protected final Selector selector;
private final int workerCount;
private final IOEventHandlerFactory eventHandlerFactory;
private final ThreadFactory threadFactory;
private final Callback
* Super-classes can implement this method to react to the event.
*
* @param count event count.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
protected abstract void processEvents(int count) throws IOReactorException;
/**
* Triggered to cancel pending session requests.
*
* Super-classes can implement this method to react to the event.
*/
protected abstract void cancelRequests();
/**
* Activates the main I/O reactor as well as all worker I/O reactors.
* The I/O main reactor will start reacting to I/O events and triggering
* notification methods. The worker I/O reactor in their turn will start
* reacting to I/O events and dispatch I/O event notifications to the
* {@link IOEventHandler} associated with the given I/O session.
*
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor and used
* to manage creation of new I/O channels. Once a new I/O channel has been
* created the processing of I/O events on that channel will be delegated
* to one of the worker I/O reactors.
*
* The method will remain blocked unto the I/O reactor is shut down or the
* execution thread is interrupted.
*
* @see #processEvents(int)
* @see #cancelRequests()
*
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
@Override
public void execute() throws InterruptedIOException, IOReactorException {
if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
doExecute();
}
}
private void doExecute() throws InterruptedIOException, IOReactorException {
try {
// Start I/O dispatchers
for (int i = 0; i < this.dispatchers.length; i++) {
final IOReactorImpl dispatcher = new IOReactorImpl(
this.eventHandlerFactory,
this.reactorConfig,
this.exceptionHandler,
this.sessionShutdownCallback);
this.dispatchers[i] = dispatcher;
}
for (int i = 0; i < this.workerCount; i++) {
final IOReactorImpl dispatcher = this.dispatchers[i];
this.workers[i] = new Worker(dispatcher);
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
}
for (int i = 0; i < this.workerCount; i++) {
if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0) {
return;
}
this.threads[i].start();
}
final long selectTimeout = this.reactorConfig.getSelectInterval();
while (!Thread.currentThread().isInterrupted()) {
if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0) {
break;
}
final int readyCount;
try {
readyCount = this.selector.select(selectTimeout);
} catch (final InterruptedIOException ex) {
throw ex;
} catch (final IOException ex) {
throw new IOReactorException("Unexpected selector failure", ex);
}
if (this.status.get().compareTo(IOReactorStatus.ACTIVE) != 0) {
break;
}
processEvents(readyCount);
// Verify I/O dispatchers
for (int i = 0; i < this.workerCount; i++) {
final Worker worker = this.workers[i];
final Exception ex = worker.getException();
if (ex != null) {
throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);
}
}
}
} catch (final ClosedSelectorException ex) {
addExceptionEvent(ex);
} catch (final IOReactorException ex) {
if (ex.getCause() != null) {
addExceptionEvent(ex.getCause());
}
throw ex;
} finally {
try {
cancelRequests();
closeActiveChannels();
try {
this.selector.close();
} catch (final IOException ex) {
addExceptionEvent(ex);
}
} finally {
synchronized (this.shutdownMutex) {
this.status.set(IOReactorStatus.SHUT_DOWN);
this.shutdownMutex.notifyAll();
}
}
}
}
private void closeActiveChannels() {
for (final SelectionKey key : this.selector.keys()) {
try {
final Channel channel = key.channel();
if (channel != null) {
channel.close();
}
} catch (final IOException ex) {
addExceptionEvent(ex);
}
}
}
/**
* Assigns the given channel entry to one of the worker I/O reactors.
*
* @param channel the new channel.
* @param sessionRequest the session request if applicable.
*/
protected void enqueuePendingSession(final SocketChannel channel, final SessionRequestImpl sessionRequest) {
// Distribute new channels among the workers
final int i = Math.abs(this.currentWorker++ % this.workerCount);
this.dispatchers[i].enqueuePendingSession(channel, sessionRequest);
}
/**
* Registers the given channel with the main {@link Selector}.
*
* @param channel the channel.
* @param ops interest ops.
* @return selection key.
* @throws ClosedChannelException if the channel has been already closed.
*/
protected SelectionKey registerChannel(
final SelectableChannel channel, final int ops) throws ClosedChannelException {
return channel.register(this.selector, ops);
}
/**
* Prepares the given {@link Socket} by resetting some of its properties.
*
* @param socket the socket
* @throws IOException in case of an I/O error.
*/
protected void prepareSocket(final Socket socket) throws IOException {
socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
if (this.reactorConfig.getSoTimeout() > 0) {
socket.setSoTimeout(this.reactorConfig.getSoTimeout());
}
if (this.reactorConfig.getSndBufSize() > 0) {
socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
}
if (this.reactorConfig.getRcvBufSize() > 0) {
socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
}
final int linger = this.reactorConfig.getSoLinger();
if (linger >= 0) {
socket.setSoLinger(true, linger);
}
}
@Override
public void initiateShutdown() {
if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
selector.wakeup();
for (int i = 0; i < this.workerCount; i++) {
final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
dispatcher.initiateShutdown();
}
}
}
}
@Override
public void awaitShutdown(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
Args.notNull(timeUnit, "Time unit");
if (this.status.get() == IOReactorStatus.INACTIVE) {
return;
}
final long timeoutMs = timeUnit.toMillis(timeout);
final long deadline = System.currentTimeMillis() + timeoutMs;
long remaining = timeoutMs;
synchronized (this.shutdownMutex) {
while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
this.shutdownMutex.wait(remaining);
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
return;
}
}
}
for (int i = 0; i < this.dispatchers.length; i++) {
final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
if (dispatcher.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
dispatcher.awaitShutdown(remaining, TimeUnit.MILLISECONDS);
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
return;
}
}
}
}
for (int i = 0; i < this.threads.length; i++) {
final Thread thread = this.threads[i];
thread.join(remaining);
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
return;
}
}
}
void forceShutdown() {
this.status.set(IOReactorStatus.SHUT_DOWN);
this.selector.wakeup();
for (int i = 0; i < this.dispatchers.length; i++) {
final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
dispatcher.forceShutdown();
}
}
for (int i = 0; i < this.threads.length; i++) {
final Thread thread = this.threads[i];
thread.interrupt();
}
}
@Override
public void shutdown(final long graceTime, final TimeUnit timeUnit) {
Args.notNull(timeUnit, "Time unit");
if (this.status.get() == IOReactorStatus.INACTIVE) {
return;
}
initiateShutdown();
try {
awaitShutdown(graceTime, timeUnit);
forceShutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void shutdown() {
shutdown(2, TimeUnit.SECONDS);
}
static void closeChannel(final Channel channel) {
try {
channel.close();
} catch (final IOException ignore) {
}
}
static class Worker implements Runnable {
final IOReactorImpl dispatcher;
private volatile Exception exception;
public Worker(final IOReactorImpl dispatcher) {
super();
this.dispatcher = dispatcher;
}
@Override
public void run() {
try {
this.dispatcher.execute();
} catch (final Exception ex) {
this.exception = ex;
}
}
public Exception getException() {
return this.exception;
}
}
static class DefaultThreadFactory implements ThreadFactory {
private final static AtomicLong COUNT = new AtomicLong(1);
@Override
public Thread newThread(final Runnable r) {
return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
}
}
}