1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Supplier;
35 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
36 import org.apache.hc.core5.io.CloseMode;
37 import org.apache.hc.core5.reactor.ConnectionInitiator;
38 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
39 import org.apache.hc.core5.reactor.IOReactorStatus;
40 import org.apache.hc.core5.util.TimeValue;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
45
46 enum Status { READY, RUNNING, TERMINATED }
47
48 private static final Logger LOG = LoggerFactory.getLogger(AbstractHttpAsyncClientBase.class);
49
50 private final AsyncPushConsumerRegistry pushConsumerRegistry;
51 private final DefaultConnectingIOReactor ioReactor;
52 private final ExecutorService executorService;
53 private final AtomicReference<Status> status;
54
55 AbstractHttpAsyncClientBase(
56 final DefaultConnectingIOReactor ioReactor,
57 final AsyncPushConsumerRegistry pushConsumerRegistry,
58 final ThreadFactory threadFactory) {
59 super();
60 this.ioReactor = ioReactor;
61 this.pushConsumerRegistry = pushConsumerRegistry;
62 this.executorService = Executors.newSingleThreadExecutor(threadFactory);
63 this.status = new AtomicReference<>(Status.READY);
64 }
65
66 @Override
67 public final void start() {
68 if (status.compareAndSet(Status.READY, Status.RUNNING)) {
69 executorService.execute(ioReactor::start);
70 }
71 }
72
73
74
75
76
77 @Deprecated
78 @Override
79 public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
80 pushConsumerRegistry.register(hostname, uriPattern, supplier);
81 }
82
83 boolean isRunning() {
84 return status.get() == Status.RUNNING;
85 }
86
87 ConnectionInitiator getConnectionInitiator() {
88 return ioReactor;
89 }
90
91 @Override
92 public final IOReactorStatus getStatus() {
93 return ioReactor.getStatus();
94 }
95
96 @Override
97 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
98 ioReactor.awaitShutdown(waitTime);
99 }
100
101 @Override
102 public final void initiateShutdown() {
103 if (LOG.isDebugEnabled()) {
104 LOG.debug("Initiating shutdown");
105 }
106 ioReactor.initiateShutdown();
107 }
108
109 void internalClose(final CloseMode closeMode) {
110 }
111
112 @Override
113 public final void close(final CloseMode closeMode) {
114 if (LOG.isDebugEnabled()) {
115 LOG.debug("Shutdown {}", closeMode);
116 }
117 ioReactor.initiateShutdown();
118 ioReactor.close(closeMode);
119 executorService.shutdownNow();
120 internalClose(closeMode);
121 }
122
123 @Override
124 public void close() {
125 close(CloseMode.GRACEFUL);
126 }
127
128 }