/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* .
*
*/
package org.apache.hc.core5.http.impl.nio.bootstrap;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.reactor.AbstractMultiworkerIOReactor;
import org.apache.hc.core5.reactor.ExceptionEvent;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
abstract class IOReactorExecutor implements AutoCloseable {
enum Status { READY, RUNNING, TERMINATED }
private final IOReactorConfig ioReactorConfig;
private final ExceptionListener exceptionListener;
private final Callback sessionShutdownCallback;
private final ExecutorService executorService;
private final ThreadFactory workerThreadFactory;
private final AtomicReference ioReactorRef;
private final AtomicReference status;
IOReactorExecutor(
final IOReactorConfig ioReactorConfig,
final ExceptionListener exceptionListener,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final Callback sessionShutdownCallback) {
super();
this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
this.exceptionListener = exceptionListener;
this.sessionShutdownCallback = sessionShutdownCallback;
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.workerThreadFactory = workerThreadFactory;
this.ioReactorRef = new AtomicReference<>(null);
this.status = new AtomicReference<>(Status.READY);
}
abstract T createIOReactor(
IOEventHandlerFactory ioEventHandlerFactory,
IOReactorConfig ioReactorConfig,
ThreadFactory threadFactory) throws IOException;
protected void execute(final IOEventHandlerFactory ioEventHandlerFactory) throws IOException {
Args.notNull(ioEventHandlerFactory, "Handler factory");
if (ioReactorRef.compareAndSet(null, createIOReactor(
ioEventHandlerFactory,
ioReactorConfig,
workerThreadFactory != null ? workerThreadFactory : new ThreadFactoryImpl("i/o dispatch")))) {
if (status.compareAndSet(Status.READY, Status.RUNNING)) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ioReactorRef.get().execute();
} catch (Exception ex) {
if (exceptionListener != null) {
exceptionListener.onError(ex);
}
}
}
});
}
} else {
throw new IllegalStateException("I/O reactor has already been started");
}
}
private T ensureRunning() {
final T ioReactor = ioReactorRef.get();
Asserts.check(ioReactor != null, "I/O reactor has not been started");
return ioReactor;
}
T reactor() {
return ensureRunning();
}
public IOReactorStatus getStatus() {
final T ioReactor = ioReactorRef.get();
return ioReactor != null ? ioReactor.getStatus() : IOReactorStatus.INACTIVE;
}
public List getAuditLog() {
final T ioReactor = ensureRunning();
return ioReactor.getAuditLog();
}
public void awaitShutdown(final long deadline, final TimeUnit timeUnit) throws InterruptedException {
final T ioReactor = ioReactorRef.get();
if (ioReactor != null) {
ioReactor.awaitShutdown(deadline, timeUnit);
}
}
private void initiateShutdown(final T ioReactor) {
if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
ioReactor.initiateShutdown();
if (sessionShutdownCallback != null) {
ioReactor.enumSessions(sessionShutdownCallback);
}
}
}
public void initiateShutdown() {
final T ioReactor = ioReactorRef.get();
if (ioReactor != null) {
initiateShutdown(ioReactor);
}
}
public void shutdown(final long graceTime, final TimeUnit timeUnit) {
final T ioReactor = ioReactorRef.get();
if (ioReactor != null) {
initiateShutdown(ioReactor);
ioReactor.shutdown(graceTime, timeUnit);
}
}
@Override
public void close() throws Exception {
shutdown(5, TimeUnit.SECONDS);
}
}