1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.function.Supplier;
35 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
36 import org.apache.hc.core5.io.CloseMode;
37 import org.apache.hc.core5.reactor.ConnectionInitiator;
38 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
39 import org.apache.hc.core5.reactor.IOReactorStatus;
40 import org.apache.hc.core5.util.TimeValue;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
45
46 enum Status { READY, RUNNING, TERMINATED }
47
48 private static final Logger LOG = LoggerFactory.getLogger(AbstractHttpAsyncClientBase.class);
49
50 private final AsyncPushConsumerRegistry pushConsumerRegistry;
51 private final DefaultConnectingIOReactor ioReactor;
52 private final ExecutorService executorService;
53 private final AtomicReference<Status> status;
54
55 AbstractHttpAsyncClientBase(
56 final DefaultConnectingIOReactor ioReactor,
57 final AsyncPushConsumerRegistry pushConsumerRegistry,
58 final ThreadFactory threadFactory) {
59 super();
60 this.ioReactor = ioReactor;
61 this.pushConsumerRegistry = pushConsumerRegistry;
62 this.executorService = Executors.newSingleThreadExecutor(threadFactory);
63 this.status = new AtomicReference<>(Status.READY);
64 }
65
66 @Override
67 public final void start() {
68 if (status.compareAndSet(Status.READY, Status.RUNNING)) {
69 executorService.execute(new Runnable() {
70
71 @Override
72 public void run() {
73 ioReactor.start();
74 }
75 });
76 }
77 }
78
79 @Override
80 public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
81 pushConsumerRegistry.register(hostname, uriPattern, supplier);
82 }
83
84 boolean isRunning() {
85 return status.get() == Status.RUNNING;
86 }
87
88 ConnectionInitiator getConnectionInitiator() {
89 return ioReactor;
90 }
91
92 @Override
93 public final IOReactorStatus getStatus() {
94 return ioReactor.getStatus();
95 }
96
97 @Override
98 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
99 ioReactor.awaitShutdown(waitTime);
100 }
101
102 @Override
103 public final void initiateShutdown() {
104 if (LOG.isDebugEnabled()) {
105 LOG.debug("Initiating shutdown");
106 }
107 ioReactor.initiateShutdown();
108 }
109
110 void internalClose(final CloseMode closeMode) {
111 }
112
113 @Override
114 public final void close(final CloseMode closeMode) {
115 if (LOG.isDebugEnabled()) {
116 LOG.debug("Shutdown {}", closeMode);
117 }
118 ioReactor.initiateShutdown();
119 ioReactor.close(closeMode);
120 executorService.shutdownNow();
121 internalClose(closeMode);
122 }
123
124 @Override
125 public void close() {
126 close(CloseMode.GRACEFUL);
127 }
128
129 }