/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* .
*
*/
package org.apache.hc.core5.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.util.Args;
/**
* Default implementation of {@link IOSession}.
*
* @since 4.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
class IOSessionImpl implements IOSession {
private final SelectionKey key;
private final SocketChannel channel;
private final AtomicInteger status;
private final AtomicInteger eventMask;
private final Deque commandQueue;
private volatile IOEventHandler eventHandler;
private volatile int socketTimeout;
/**
* Creates new instance of IOSessionImpl.
*
* @param key the selection key.
* @param socketChannel the socket channel
*
* @since 4.1
*/
public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
super();
this.key = Args.notNull(key, "Selection key");
this.channel = Args.notNull(socketChannel, "Socket channel");
this.commandQueue = new ConcurrentLinkedDeque<>();
this.socketTimeout = 0;
this.eventMask = new AtomicInteger(key.interestOps());
this.status = new AtomicInteger(ACTIVE);
}
@Override
public IOEventHandler getHandler() {
return this.eventHandler;
}
@Override
public void setHandler(final IOEventHandler handler) {
this.eventHandler = handler;
}
@Override
public void addLast(final Command command) {
commandQueue.addLast(command);
setEvent(SelectionKey.OP_WRITE);
}
@Override
public void addFirst(final Command command) {
commandQueue.addFirst(command);
setEvent(SelectionKey.OP_WRITE);
}
@Override
public Command getCommand() {
return commandQueue.poll();
}
@Override
public ByteChannel channel() {
return this.channel;
}
@Override
public SocketAddress getLocalAddress() {
return this.channel.socket().getLocalSocketAddress();
}
@Override
public SocketAddress getRemoteAddress() {
return this.channel.socket().getRemoteSocketAddress();
}
@Override
public int getEventMask() {
return this.key.interestOps();
}
@Override
public void setEventMask(final int newValue) {
if (this.status.get() == CLOSED) {
return;
}
final int currentValue = this.eventMask.get();
if (newValue == currentValue) {
return;
}
if (this.eventMask.compareAndSet(currentValue, newValue)) {
this.key.interestOps(newValue);
this.key.selector().wakeup();
}
}
@Override
public void setEvent(final int op) {
if (this.status.get() == CLOSED) {
return;
}
for (;;) {
final int currentValue = this.eventMask.get();
final int newValue = currentValue | op;
if (this.eventMask.compareAndSet(currentValue, newValue)) {
this.key.interestOps(newValue);
this.key.selector().wakeup();
return;
}
}
}
@Override
public void clearEvent(final int op) {
if (this.status.get() == CLOSED) {
return;
}
for (;;) {
final int currentValue = this.eventMask.get();
final int newValue = currentValue & ~op;
if (this.eventMask.compareAndSet(currentValue, newValue)) {
this.key.interestOps(newValue);
this.key.selector().wakeup();
return;
}
}
}
@Override
public int getSocketTimeout() {
return this.socketTimeout;
}
@Override
public void setSocketTimeout(final int timeout) {
this.socketTimeout = timeout;
}
@Override
public void close() {
if (this.status.compareAndSet(ACTIVE, CLOSED)) {
this.key.cancel();
this.key.attach(null);
try {
this.key.channel().close();
} catch (final IOException ignore) {
}
if (this.key.selector().isOpen()) {
this.key.selector().wakeup();
}
}
}
@Override
public int getStatus() {
return this.status.get();
}
@Override
public boolean isClosed() {
return this.status.get() == CLOSED || !this.channel.isOpen();
}
@Override
public void shutdown() {
// For this type of session, a close() does exactly
// what we need and nothing more.
close();
}
private static void formatOps(final StringBuilder buffer, final int ops) {
if ((ops & SelectionKey.OP_READ) > 0) {
buffer.append('r');
}
if ((ops & SelectionKey.OP_WRITE) > 0) {
buffer.append('w');
}
if ((ops & SelectionKey.OP_ACCEPT) > 0) {
buffer.append('a');
}
if ((ops & SelectionKey.OP_CONNECT) > 0) {
buffer.append('c');
}
}
private static void formatAddress(final StringBuilder buffer, final SocketAddress socketAddress) {
if (socketAddress instanceof InetSocketAddress) {
final InetSocketAddress addr = (InetSocketAddress) socketAddress;
buffer.append(addr.getAddress() != null ? addr.getAddress().getHostAddress() :
addr.getAddress())
.append(':')
.append(addr.getPort());
} else {
buffer.append(socketAddress);
}
}
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder();
final SocketAddress remoteAddress = getRemoteAddress();
final SocketAddress localAddress = getLocalAddress();
if (remoteAddress != null && localAddress != null) {
formatAddress(buffer, localAddress);
buffer.append("<->");
formatAddress(buffer, remoteAddress);
}
buffer.append("[");
switch (this.status.get()) {
case ACTIVE:
buffer.append("ACTIVE");
break;
case CLOSING:
buffer.append("CLOSING");
break;
case CLOSED:
buffer.append("CLOSED");
break;
}
buffer.append("][");
if (this.key.isValid()) {
formatOps(buffer, this.key.interestOps());
buffer.append(":");
formatOps(buffer, this.key.readyOps());
}
buffer.append("]");
return buffer.toString();
}
}