1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.util.Arrays;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.ConnectExceptionSupport;
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
42 import org.apache.hc.core5.concurrent.ComplexFuture;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ConnectionInitiator;
46 import org.apache.hc.core5.reactor.IOSession;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 final class MultihomeIOSessionRequester {
52
53 private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class);
54 private final DnsResolver dnsResolver;
55
56 MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
57 this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
58 }
59
60 public Future<IOSession> connect(
61 final ConnectionInitiator connectionInitiator,
62 final NamedEndpoint remoteEndpoint,
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress,
65 final Timeout connectTimeout,
66 final Object attachment,
67 final FutureCallback<IOSession> callback) {
68
69 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
70 if (remoteAddress != null) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("{} connecting {} to {} ({})", remoteEndpoint, localAddress, remoteAddress, connectTimeout);
73 }
74 final Future<IOSession> sessionFuture = connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, new FutureCallback<IOSession>() {
75 @Override
76 public void completed(final IOSession session) {
77 future.completed(session);
78 }
79
80 @Override
81 public void failed(final Exception cause) {
82 if (LOG.isDebugEnabled()) {
83 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
84 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
85 }
86 if (cause instanceof IOException) {
87 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint,
88 (remoteAddress instanceof InetSocketAddress) ?
89 new InetAddress[] { ((InetSocketAddress) remoteAddress).getAddress() } :
90 new InetAddress[] {}));
91 } else {
92 future.failed(cause);
93 }
94 }
95
96 @Override
97 public void cancelled() {
98 future.cancel();
99 }
100
101 });
102 future.setDependency(sessionFuture);
103 return future;
104 }
105
106 if (LOG.isDebugEnabled()) {
107 LOG.debug("{} resolving remote address", remoteEndpoint);
108 }
109
110 final InetAddress[] remoteAddresses;
111 try {
112 remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
113 } catch (final UnknownHostException ex) {
114 future.failed(ex);
115 return future;
116 }
117
118 if (LOG.isDebugEnabled()) {
119 LOG.debug("{} resolved to {}", remoteEndpoint, Arrays.asList(remoteAddresses));
120 }
121
122 final Runnable runnable = new Runnable() {
123
124 private final AtomicInteger attempt = new AtomicInteger(0);
125
126 void executeNext() {
127 final int index = attempt.getAndIncrement();
128 final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
129
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("{} connecting {} to {} ({})", remoteEndpoint, localAddress, remoteAddress, connectTimeout);
132 }
133
134 final Future<IOSession> sessionFuture = connectionInitiator.connect(
135 remoteEndpoint,
136 remoteAddress,
137 localAddress,
138 connectTimeout,
139 attachment,
140 new FutureCallback<IOSession>() {
141
142 @Override
143 public void completed(final IOSession session) {
144 if (LOG.isDebugEnabled()) {
145 if (LOG.isDebugEnabled()) {
146 LOG.debug("{} connected {} {}->{}", remoteEndpoint, session.getId(), session.getLocalAddress(), session.getRemoteAddress());
147 }
148 }
149 future.completed(session);
150 }
151
152 @Override
153 public void failed(final Exception cause) {
154 if (attempt.get() >= remoteAddresses.length) {
155 if (LOG.isDebugEnabled()) {
156 LOG.debug("{} connection to {} failed ({}); terminating operation", remoteEndpoint, remoteAddress, cause.getClass());
157 }
158 if (cause instanceof IOException) {
159 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, remoteAddresses));
160 } else {
161 future.failed(cause);
162 }
163 } else {
164 if (LOG.isDebugEnabled()) {
165 LOG.debug("{} connection to {} failed ({}); retrying connection to the next address", remoteEndpoint, remoteAddress, cause.getClass());
166 }
167 executeNext();
168 }
169 }
170
171 @Override
172 public void cancelled() {
173 future.cancel();
174 }
175
176 });
177 future.setDependency(sessionFuture);
178 }
179
180 @Override
181 public void run() {
182 executeNext();
183 }
184
185 };
186 runnable.run();
187 return future;
188 }
189
190 public Future<IOSession> connect(
191 final ConnectionInitiator connectionInitiator,
192 final NamedEndpoint remoteEndpoint,
193 final SocketAddress localAddress,
194 final Timeout connectTimeout,
195 final Object attachment,
196 final FutureCallback<IOSession> callback) {
197 return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
198 }
199
200 }