();
this.timeoutCheckInterval = selectTimeout;
this.lastTimeoutCheck = System.currentTimeMillis();
}
/**
* Activates the I/O reactor. The I/O reactor will start reacting to I/O
* events and dispatch I/O event notifications to the given
* {@link IOEventDispatch}.
*
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
public void execute(
final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
if (eventDispatch == null) {
throw new IllegalArgumentException("Event dispatcher may not be null");
}
this.eventDispatch = eventDispatch;
execute();
}
/**
* Sets exception handler for this I/O reactor.
*
* @param exceptionHandler the exception handler.
*/
public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
/**
* Handles the given {@link RuntimeException}. This method delegates
* handling of the exception to the {@link IOReactorExceptionHandler},
* if available.
*
* @param ex the runtime exception.
*/
protected void handleRuntimeException(final RuntimeException ex) {
if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
throw ex;
}
}
/**
* This I/O reactor implementation does not react to the
* {@link SelectionKey#OP_ACCEPT} event.
*
* Super-classes can override this method to react to the event.
*/
@Override
protected void acceptable(final SelectionKey key) {
}
/**
* This I/O reactor implementation does not react to the
* {@link SelectionKey#OP_CONNECT} event.
*
* Super-classes can override this method to react to the event.
*/
@Override
protected void connectable(final SelectionKey key) {
}
/**
* Processes {@link SelectionKey#OP_READ} event on the given selection key.
* This method dispatches the event notification to the
* {@link IOEventDispatch#inputReady(IOSession)} method.
*/
@Override
protected void readable(final SelectionKey key) {
IOSession session = getSession(key);
try {
this.eventDispatch.inputReady(session);
if (session.hasBufferedInput()) {
this.bufferingSessions.add(session);
}
} catch (CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
/**
* Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
* This method dispatches the event notification to the
* {@link IOEventDispatch#outputReady(IOSession)} method.
*/
@Override
protected void writable(final SelectionKey key) {
IOSession session = getSession(key);
try {
this.eventDispatch.outputReady(session);
} catch (CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
/**
* Verifies whether any of the sessions associated with the given selection
* keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
* method.
*
* This method will also invoke the
* {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
* that have buffered input data.
*/
@Override
protected void validate(final Set keys) {
long currentTime = System.currentTimeMillis();
if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
this.lastTimeoutCheck = currentTime;
if (keys != null) {
for (Iterator it = keys.iterator(); it.hasNext();) {
SelectionKey key = it.next();
timeoutCheck(key, currentTime);
}
}
}
if (!this.bufferingSessions.isEmpty()) {
for (Iterator it = this.bufferingSessions.iterator(); it.hasNext(); ) {
IOSession session = it.next();
if (!session.hasBufferedInput()) {
it.remove();
continue;
}
try {
if ((session.getEventMask() & EventMask.READ) > 0) {
this.eventDispatch.inputReady(session);
if (!session.hasBufferedInput()) {
it.remove();
}
}
} catch (CancelledKeyException ex) {
it.remove();
queueClosedSession(session);
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
}
}
/**
* Processes newly created I/O session. This method dispatches the event
* notification to the {@link IOEventDispatch#connected(IOSession)} method.
*/
@Override
protected void sessionCreated(final SelectionKey key, final IOSession session) {
try {
this.eventDispatch.connected(session);
} catch (CancelledKeyException ex) {
queueClosedSession(session);
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
/**
* Processes timed out I/O session. This method dispatches the event
* notification to the {@link IOEventDispatch#timeout(IOSession)} method.
*/
@Override
protected void sessionTimedOut(final IOSession session) {
try {
this.eventDispatch.timeout(session);
} catch (CancelledKeyException ex) {
queueClosedSession(session);
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
/**
* Processes closed I/O session. This method dispatches the event
* notification to the {@link IOEventDispatch#disconnected(IOSession)}
* method.
*/
@Override
protected void sessionClosed(final IOSession session) {
try {
this.eventDispatch.disconnected(session);
} catch (CancelledKeyException ex) {
// ignore
} catch (RuntimeException ex) {
handleRuntimeException(ex);
}
}
}