1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.util.Arrays;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.ConnectExceptionSupport;
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
42 import org.apache.hc.core5.concurrent.ComplexFuture;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ConnectionInitiator;
46 import org.apache.hc.core5.reactor.IOSession;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 final class MultihomeIOSessionRequester {
52
53 private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class);
54 private final DnsResolver dnsResolver;
55
56 MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
57 this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
58 }
59
60 public Future<IOSession> connect(
61 final ConnectionInitiator connectionInitiator,
62 final NamedEndpoint remoteEndpoint,
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress,
65 final Timeout connectTimeout,
66 final Object attachment,
67 final FutureCallback<IOSession> callback) {
68
69 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
70 if (remoteAddress != null) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("{}:{} connecting {} to {} ({})",
73 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
74 }
75 final Future<IOSession> sessionFuture = connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, new FutureCallback<IOSession>() {
76 @Override
77 public void completed(final IOSession session) {
78 future.completed(session);
79 }
80
81 @Override
82 public void failed(final Exception cause) {
83 if (LOG.isDebugEnabled()) {
84 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
85 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
86 }
87 if (cause instanceof IOException) {
88 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint,
89 (remoteAddress instanceof InetSocketAddress) ?
90 new InetAddress[] { ((InetSocketAddress) remoteAddress).getAddress() } :
91 new InetAddress[] {}));
92 } else {
93 future.failed(cause);
94 }
95 }
96
97 @Override
98 public void cancelled() {
99 future.cancel();
100 }
101
102 });
103 future.setDependency(sessionFuture);
104 return future;
105 }
106
107 if (LOG.isDebugEnabled()) {
108 LOG.debug("{} resolving remote address", remoteEndpoint.getHostName());
109 }
110
111 final InetAddress[] remoteAddresses;
112 try {
113 remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
114 } catch (final UnknownHostException ex) {
115 future.failed(ex);
116 return future;
117 }
118
119 if (LOG.isDebugEnabled()) {
120 LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), Arrays.asList(remoteAddresses));
121 }
122
123 final Runnable runnable = new Runnable() {
124
125 private final AtomicInteger attempt = new AtomicInteger(0);
126
127 void executeNext() {
128 final int index = attempt.getAndIncrement();
129 final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
130
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("{}:{} connecting {}->{} ({})",
133 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
134 }
135
136 final Future<IOSession> sessionFuture = connectionInitiator.connect(
137 remoteEndpoint,
138 remoteAddress,
139 localAddress,
140 connectTimeout,
141 attachment,
142 new FutureCallback<IOSession>() {
143
144 @Override
145 public void completed(final IOSession session) {
146 if (LOG.isDebugEnabled()) {
147 LOG.debug("{}:{} connected {}->{} as {}",
148 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, session.getId());
149 }
150 future.completed(session);
151 }
152
153 @Override
154 public void failed(final Exception cause) {
155 if (attempt.get() >= remoteAddresses.length) {
156 if (LOG.isDebugEnabled()) {
157 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
158 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
159 }
160 if (cause instanceof IOException) {
161 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, remoteAddresses));
162 } else {
163 future.failed(cause);
164 }
165 } else {
166 if (LOG.isDebugEnabled()) {
167 LOG.debug("{}:{} connection to {} failed ({}); retrying connection to the next address",
168 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
169 }
170 executeNext();
171 }
172 }
173
174 @Override
175 public void cancelled() {
176 future.cancel();
177 }
178
179 });
180 future.setDependency(sessionFuture);
181 }
182
183 @Override
184 public void run() {
185 executeNext();
186 }
187
188 };
189 runnable.run();
190 return future;
191 }
192
193 public Future<IOSession> connect(
194 final ConnectionInitiator connectionInitiator,
195 final NamedEndpoint remoteEndpoint,
196 final SocketAddress localAddress,
197 final Timeout connectTimeout,
198 final Object attachment,
199 final FutureCallback<IOSession> callback) {
200 return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
201 }
202
203 }