1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.IOException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Callback;
35 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
36 import org.apache.hc.core5.io.CloseMode;
37 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
38 import org.apache.hc.core5.reactor.IOReactorConfig;
39 import org.apache.hc.core5.reactor.IOReactorService;
40 import org.apache.hc.core5.reactor.IOReactorStatus;
41 import org.apache.hc.core5.reactor.IOSession;
42 import org.apache.hc.core5.util.Args;
43 import org.apache.hc.core5.util.Asserts;
44 import org.apache.hc.core5.util.TimeValue;
45
46 abstract class IOReactorExecutor<T extends IOReactorService> implements AutoCloseable {
47
48 enum Status { READY, RUNNING, TERMINATED }
49
50 private final IOReactorConfig ioReactorConfig;
51 private final ThreadFactory workerThreadFactory;
52 private final AtomicReference<T> ioReactorRef;
53 private final AtomicReference<Status> status;
54
55 IOReactorExecutor(final IOReactorConfig ioReactorConfig, final ThreadFactory workerThreadFactory) {
56 super();
57 this.ioReactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
58 this.workerThreadFactory = workerThreadFactory;
59 this.ioReactorRef = new AtomicReference<>();
60 this.status = new AtomicReference<>(Status.READY);
61 }
62
63 abstract T createIOReactor(
64 IOEventHandlerFactory ioEventHandlerFactory,
65 IOReactorConfig ioReactorConfig,
66 ThreadFactory threadFactory,
67 Callback<IOSession> sessionShutdownCallback) throws IOException;
68
69 protected void execute(final IOEventHandlerFactory ioEventHandlerFactory) throws IOException {
70 Args.notNull(ioEventHandlerFactory, "Handler factory");
71 if (ioReactorRef.compareAndSet(null, createIOReactor(
72 ioEventHandlerFactory,
73 ioReactorConfig,
74 workerThreadFactory,
75 ShutdownCommand.GRACEFUL_NORMAL_CALLBACK))) {
76 if (status.compareAndSet(Status.READY, Status.RUNNING)) {
77 ioReactorRef.get().start();
78 }
79 } else {
80 throw new IllegalStateException("I/O reactor has already been started");
81 }
82 }
83
84 private T ensureRunning() {
85 final T ioReactor = ioReactorRef.get();
86 Asserts.check(ioReactor != null, "I/O reactor has not been started");
87 return ioReactor;
88 }
89
90 T reactor() {
91 return ensureRunning();
92 }
93
94 public IOReactorStatus getStatus() {
95 final T ioReactor = ioReactorRef.get();
96 return ioReactor != null ? ioReactor.getStatus() : IOReactorStatus.INACTIVE;
97 }
98
99 public void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
100 Args.notNull(waitTime, "Wait time");
101 final T ioReactor = ioReactorRef.get();
102 if (ioReactor != null) {
103 ioReactor.awaitShutdown(waitTime);
104 }
105 }
106
107 public void initiateShutdown() {
108 final T ioReactor = ioReactorRef.get();
109 if (ioReactor != null) {
110 if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
111 ioReactor.initiateShutdown();
112 }
113 }
114 }
115
116 public void shutdown(final TimeValue graceTime) {
117 Args.notNull(graceTime, "Grace time");
118 final T ioReactor = ioReactorRef.get();
119 if (ioReactor != null) {
120 if (status.compareAndSet(Status.RUNNING, Status.TERMINATED)) {
121 ioReactor.initiateShutdown();
122 }
123 try {
124 ioReactor.awaitShutdown(graceTime);
125 } catch (final InterruptedException ex) {
126 Thread.currentThread().interrupt();
127 }
128 ioReactor.close(CloseMode.IMMEDIATE);
129 }
130 }
131
132 @Override
133 public void close() throws Exception {
134 shutdown(TimeValue.ofSeconds(5));
135 }
136
137 }