1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.util.Arrays;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.ConnectExceptionSupport;
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
42 import org.apache.hc.core5.concurrent.ComplexFuture;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ConnectionInitiator;
46 import org.apache.hc.core5.reactor.IOSession;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 final class MultihomeIOSessionRequester {
52
53 private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class);
54 private final DnsResolver dnsResolver;
55
56 MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
57 this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
58 }
59
60 public Future<IOSession> connect(
61 final ConnectionInitiator connectionInitiator,
62 final NamedEndpoint remoteEndpoint,
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress,
65 final Timeout connectTimeout,
66 final Object attachment,
67 final FutureCallback<IOSession> callback) {
68
69 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
70 if (remoteAddress != null) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("{}:{} connecting {} to {} ({})",
73 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
74 }
75 final Future<IOSession> sessionFuture = connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, new FutureCallback<IOSession>() {
76 @Override
77 public void completed(final IOSession session) {
78 future.completed(session);
79 }
80
81 @Override
82 public void failed(final Exception cause) {
83 if (LOG.isDebugEnabled()) {
84 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
85 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
86 }
87 if (cause instanceof IOException) {
88 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint,
89 (remoteAddress instanceof InetSocketAddress) ?
90 new InetAddress[] { ((InetSocketAddress) remoteAddress).getAddress() } :
91 new InetAddress[] {}));
92 } else {
93 future.failed(cause);
94 }
95 }
96
97 @Override
98 public void cancelled() {
99 future.cancel();
100 }
101
102 });
103 future.setDependency(sessionFuture);
104 return future;
105 }
106
107 if (LOG.isDebugEnabled()) {
108 LOG.debug("{} resolving remote address", remoteEndpoint.getHostName());
109 }
110
111 final InetAddress[] remoteAddresses;
112 try {
113 remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
114 if (remoteAddresses == null || remoteAddresses.length == 0) {
115 throw new UnknownHostException(remoteEndpoint.getHostName());
116 }
117 } catch (final UnknownHostException ex) {
118 future.failed(ex);
119 return future;
120 }
121
122 if (LOG.isDebugEnabled()) {
123 LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), Arrays.asList(remoteAddresses));
124 }
125
126 final Runnable runnable = new Runnable() {
127
128 private final AtomicInteger attempt = new AtomicInteger(0);
129
130 void executeNext() {
131 final int index = attempt.getAndIncrement();
132 final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
133
134 if (LOG.isDebugEnabled()) {
135 LOG.debug("{}:{} connecting {}->{} ({})",
136 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
137 }
138
139 final Future<IOSession> sessionFuture = connectionInitiator.connect(
140 remoteEndpoint,
141 remoteAddress,
142 localAddress,
143 connectTimeout,
144 attachment,
145 new FutureCallback<IOSession>() {
146
147 @Override
148 public void completed(final IOSession session) {
149 if (LOG.isDebugEnabled()) {
150 LOG.debug("{}:{} connected {}->{} as {}",
151 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, session.getId());
152 }
153 future.completed(session);
154 }
155
156 @Override
157 public void failed(final Exception cause) {
158 if (attempt.get() >= remoteAddresses.length) {
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
161 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
162 }
163 if (cause instanceof IOException) {
164 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, remoteAddresses));
165 } else {
166 future.failed(cause);
167 }
168 } else {
169 if (LOG.isDebugEnabled()) {
170 LOG.debug("{}:{} connection to {} failed ({}); retrying connection to the next address",
171 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
172 }
173 executeNext();
174 }
175 }
176
177 @Override
178 public void cancelled() {
179 future.cancel();
180 }
181
182 });
183 future.setDependency(sessionFuture);
184 }
185
186 @Override
187 public void run() {
188 executeNext();
189 }
190
191 };
192 runnable.run();
193 return future;
194 }
195
196 public Future<IOSession> connect(
197 final ConnectionInitiator connectionInitiator,
198 final NamedEndpoint remoteEndpoint,
199 final SocketAddress localAddress,
200 final Timeout connectTimeout,
201 final Object attachment,
202 final FutureCallback<IOSession> callback) {
203 return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
204 }
205
206 }