1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Supplier;
35 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
36 import org.apache.hc.core5.io.CloseMode;
37 import org.apache.hc.core5.reactor.ConnectionInitiator;
38 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
39 import org.apache.hc.core5.reactor.IOReactorStatus;
40 import org.apache.hc.core5.util.TimeValue;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
45
46 enum Status { READY, RUNNING, TERMINATED }
47
48 private static final Logger LOG = LoggerFactory.getLogger(AbstractHttpAsyncClientBase.class);
49
50 private final AsyncPushConsumerRegistry pushConsumerRegistry;
51 private final DefaultConnectingIOReactor ioReactor;
52 private final ExecutorService executorService;
53 private final AtomicReference<Status> status;
54
55 AbstractHttpAsyncClientBase(
56 final DefaultConnectingIOReactor ioReactor,
57 final AsyncPushConsumerRegistry pushConsumerRegistry,
58 final ThreadFactory threadFactory) {
59 super();
60 this.ioReactor = ioReactor;
61 this.pushConsumerRegistry = pushConsumerRegistry;
62 this.executorService = Executors.newSingleThreadExecutor(threadFactory);
63 this.status = new AtomicReference<>(Status.READY);
64 }
65
66 @Override
67 public final void start() {
68 if (status.compareAndSet(Status.READY, Status.RUNNING)) {
69 executorService.execute(ioReactor::start);
70 }
71 }
72
73 @Override
74 public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
75 pushConsumerRegistry.register(hostname, uriPattern, supplier);
76 }
77
78 boolean isRunning() {
79 return status.get() == Status.RUNNING;
80 }
81
82 ConnectionInitiator getConnectionInitiator() {
83 return ioReactor;
84 }
85
86 @Override
87 public final IOReactorStatus getStatus() {
88 return ioReactor.getStatus();
89 }
90
91 @Override
92 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
93 ioReactor.awaitShutdown(waitTime);
94 }
95
96 @Override
97 public final void initiateShutdown() {
98 if (LOG.isDebugEnabled()) {
99 LOG.debug("Initiating shutdown");
100 }
101 ioReactor.initiateShutdown();
102 }
103
104 void internalClose(final CloseMode closeMode) {
105 }
106
107 @Override
108 public final void close(final CloseMode closeMode) {
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("Shutdown {}", closeMode);
111 }
112 ioReactor.initiateShutdown();
113 ioReactor.close(closeMode);
114 executorService.shutdownNow();
115 internalClose(closeMode);
116 }
117
118 @Override
119 public void close() {
120 close(CloseMode.GRACEFUL);
121 }
122
123 }