1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.util.Arrays;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.ConnectExceptionSupport;
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
42 import org.apache.hc.core5.concurrent.ComplexFuture;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ConnectionInitiator;
46 import org.apache.hc.core5.reactor.IOSession;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 final class MultihomeIOSessionRequester {
52
53 private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class);
54 private final DnsResolver dnsResolver;
55
56 MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
57 this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
58 }
59
60 public Future<IOSession> connect(
61 final ConnectionInitiator connectionInitiator,
62 final NamedEndpoint remoteEndpoint,
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress,
65 final Timeout connectTimeout,
66 final Object attachment,
67 final FutureCallback<IOSession> callback) {
68
69 if (remoteAddress != null) {
70 if (LOG.isDebugEnabled()) {
71 LOG.debug("{} connecting {} to {} ({})", remoteEndpoint, localAddress, remoteAddress, connectTimeout);
72 }
73 return connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, callback);
74 }
75
76 if (LOG.isDebugEnabled()) {
77 LOG.debug("{} resolving remote address", remoteEndpoint);
78 }
79
80 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
81 final InetAddress[] remoteAddresses;
82 try {
83 remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
84 } catch (final UnknownHostException ex) {
85 future.failed(ex);
86 return future;
87 }
88
89 if (LOG.isDebugEnabled()) {
90 LOG.debug("{} resolved to {}", remoteEndpoint, Arrays.asList(remoteAddresses));
91 }
92
93 final Runnable runnable = new Runnable() {
94
95 private final AtomicInteger attempt = new AtomicInteger(0);
96
97 void executeNext() {
98 final int index = attempt.getAndIncrement();
99 final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
100
101 if (LOG.isDebugEnabled()) {
102 LOG.debug("{} connecting {} to {} ({})", remoteEndpoint, localAddress, remoteAddress, connectTimeout);
103 }
104
105 final Future<IOSession> sessionFuture = connectionInitiator.connect(
106 remoteEndpoint,
107 remoteAddress,
108 localAddress,
109 connectTimeout,
110 attachment,
111 new FutureCallback<IOSession>() {
112
113 @Override
114 public void completed(final IOSession session) {
115 if (LOG.isDebugEnabled()) {
116 if (LOG.isDebugEnabled()) {
117 LOG.debug("{} connected {} {}->{}", remoteEndpoint, session.getId(), session.getLocalAddress(), session.getRemoteAddress());
118 }
119 }
120 future.completed(session);
121 }
122
123 @Override
124 public void failed(final Exception cause) {
125 if (attempt.get() >= remoteAddresses.length) {
126 if (LOG.isDebugEnabled()) {
127 LOG.debug("{} connection to {} failed ({}); terminating operation", remoteEndpoint, remoteAddress, cause.getClass());
128 }
129 if (cause instanceof IOException) {
130 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, remoteAddresses));
131 } else {
132 future.failed(cause);
133 }
134 } else {
135 if (LOG.isDebugEnabled()) {
136 LOG.debug("{} connection to {} failed ({}); retrying connection to the next address", remoteEndpoint, remoteAddress, cause.getClass());
137 }
138 executeNext();
139 }
140 }
141
142 @Override
143 public void cancelled() {
144 future.cancel();
145 }
146
147 });
148 future.setDependency(sessionFuture);
149 }
150
151 @Override
152 public void run() {
153 executeNext();
154 }
155
156 };
157 runnable.run();
158 return future;
159 }
160
161 public Future<IOSession> connect(
162 final ConnectionInitiator connectionInitiator,
163 final NamedEndpoint remoteEndpoint,
164 final SocketAddress localAddress,
165 final Timeout connectTimeout,
166 final Object attachment,
167 final FutureCallback<IOSession> callback) {
168 return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
169 }
170
171 }