1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.io;
28
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.util.Arrays;
34
35 import org.apache.hc.client5.http.ConnectExceptionSupport;
36 import org.apache.hc.client5.http.DnsResolver;
37 import org.apache.hc.client5.http.SchemePortResolver;
38 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
39 import org.apache.hc.client5.http.UnsupportedSchemeException;
40 import org.apache.hc.client5.http.impl.ConnPoolSupport;
41 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
42 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
43 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
46 import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory;
47 import org.apache.hc.core5.annotation.Contract;
48 import org.apache.hc.core5.annotation.Internal;
49 import org.apache.hc.core5.annotation.ThreadingBehavior;
50 import org.apache.hc.core5.http.ConnectionClosedException;
51 import org.apache.hc.core5.http.HttpHost;
52 import org.apache.hc.core5.http.config.Lookup;
53 import org.apache.hc.core5.http.io.SocketConfig;
54 import org.apache.hc.core5.http.protocol.HttpContext;
55 import org.apache.hc.core5.util.Args;
56 import org.apache.hc.core5.util.TimeValue;
57 import org.apache.hc.core5.util.Timeout;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61
62
63
64
65
66
67
68 @Internal
69 @Contract(threading = ThreadingBehavior.STATELESS)
70 public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator {
71
72 static final String SOCKET_FACTORY_REGISTRY = "http.socket-factory-registry";
73
74 private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpClientConnectionOperator.class);
75
76 private final Lookup<ConnectionSocketFactory> socketFactoryRegistry;
77 private final SchemePortResolver schemePortResolver;
78 private final DnsResolver dnsResolver;
79
80 public DefaultHttpClientConnectionOperator(
81 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
82 final SchemePortResolver schemePortResolver,
83 final DnsResolver dnsResolver) {
84 super();
85 Args.notNull(socketFactoryRegistry, "Socket factory registry");
86 this.socketFactoryRegistry = socketFactoryRegistry;
87 this.schemePortResolver = schemePortResolver != null ? schemePortResolver :
88 DefaultSchemePortResolver.INSTANCE;
89 this.dnsResolver = dnsResolver != null ? dnsResolver :
90 SystemDefaultDnsResolver.INSTANCE;
91 }
92
93 @SuppressWarnings("unchecked")
94 private Lookup<ConnectionSocketFactory> getSocketFactoryRegistry(final HttpContext context) {
95 Lookup<ConnectionSocketFactory> reg = (Lookup<ConnectionSocketFactory>) context.getAttribute(
96 SOCKET_FACTORY_REGISTRY);
97 if (reg == null) {
98 reg = this.socketFactoryRegistry;
99 }
100 return reg;
101 }
102
103 @Override
104 public void connect(
105 final ManagedHttpClientConnection conn,
106 final HttpHost host,
107 final InetSocketAddress localAddress,
108 final TimeValue connectTimeout,
109 final SocketConfig socketConfig,
110 final HttpContext context) throws IOException {
111 final Timeout timeout = connectTimeout != null ? Timeout.of(connectTimeout.getDuration(), connectTimeout.getTimeUnit()) : null;
112 connect(conn, host, localAddress, timeout, socketConfig, null, context);
113 }
114
115 @Override
116 public void connect(
117 final ManagedHttpClientConnection conn,
118 final HttpHost host,
119 final InetSocketAddress localAddress,
120 final Timeout connectTimeout,
121 final SocketConfig socketConfig,
122 final Object attachment,
123 final HttpContext context) throws IOException {
124 Args.notNull(conn, "Connection");
125 Args.notNull(host, "Host");
126 Args.notNull(socketConfig, "Socket config");
127 Args.notNull(context, "Context");
128 final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
129 final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
130 if (sf == null) {
131 throw new UnsupportedSchemeException(host.getSchemeName() + " protocol is not supported");
132 }
133 final InetAddress[] remoteAddresses;
134 if (host.getAddress() != null) {
135 remoteAddresses = new InetAddress[] { host.getAddress() };
136 } else {
137 if (LOG.isDebugEnabled()) {
138 LOG.debug("{} resolving remote address", host.getHostName());
139 }
140
141 remoteAddresses = this.dnsResolver.resolve(host.getHostName());
142
143 if (LOG.isDebugEnabled()) {
144 LOG.debug("{} resolved to {}", host.getHostName(), Arrays.asList(remoteAddresses));
145 }
146 }
147
148 final Timeout soTimeout = socketConfig.getSoTimeout();
149
150 final int port = this.schemePortResolver.resolve(host);
151 for (int i = 0; i < remoteAddresses.length; i++) {
152 final InetAddress address = remoteAddresses[i];
153 final boolean last = i == remoteAddresses.length - 1;
154
155 Socket sock = sf.createSocket(context);
156 if (soTimeout != null) {
157 sock.setSoTimeout(soTimeout.toMillisecondsIntBound());
158 }
159 sock.setReuseAddress(socketConfig.isSoReuseAddress());
160 sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
161 sock.setKeepAlive(socketConfig.isSoKeepAlive());
162 if (socketConfig.getRcvBufSize() > 0) {
163 sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
164 }
165 if (socketConfig.getSndBufSize() > 0) {
166 sock.setSendBufferSize(socketConfig.getSndBufSize());
167 }
168
169 final int linger = socketConfig.getSoLinger().toMillisecondsIntBound();
170 if (linger >= 0) {
171 sock.setSoLinger(true, linger);
172 }
173 conn.bind(sock);
174
175 final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
176 if (LOG.isDebugEnabled()) {
177 LOG.debug("{}:{} connecting {}->{} ({})",
178 host.getHostName(), host.getPort(), localAddress, remoteAddress, connectTimeout);
179 }
180 try {
181 sock = sf.connectSocket(sock, host, remoteAddress, localAddress, connectTimeout, attachment, context);
182 conn.bind(sock);
183 conn.setSocketTimeout(soTimeout);
184 if (LOG.isDebugEnabled()) {
185 LOG.debug("{}:{} connected {}->{} as {}",
186 host.getHostName(), host.getPort(), localAddress, remoteAddress, ConnPoolSupport.getId(conn));
187 }
188 return;
189 } catch (final IOException ex) {
190 if (last) {
191 if (LOG.isDebugEnabled()) {
192 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
193 host.getHostName(), host.getPort(), remoteAddress, ex.getClass());
194 }
195 throw ConnectExceptionSupport.enhance(ex, host, remoteAddresses);
196 } else {
197 if (LOG.isDebugEnabled()) {
198 LOG.debug("{}:{} connection to {} failed ({}); retrying connection to the next address",
199 host.getHostName(), host.getPort(), remoteAddress, ex.getClass());
200 }
201 }
202 }
203 }
204 }
205
206 @Override
207 public void upgrade(
208 final ManagedHttpClientConnection conn,
209 final HttpHost host,
210 final HttpContext context) throws IOException {
211 upgrade(conn, host, null, context);
212 }
213
214 @Override
215 public void upgrade(
216 final ManagedHttpClientConnection conn,
217 final HttpHost host,
218 final Object attachment,
219 final HttpContext context) throws IOException {
220 final HttpClientContext clientContext = HttpClientContext.adapt(context);
221 final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(clientContext);
222 final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
223 if (sf == null) {
224 throw new UnsupportedSchemeException(host.getSchemeName() +
225 " protocol is not supported");
226 }
227 if (!(sf instanceof LayeredConnectionSocketFactory)) {
228 throw new UnsupportedSchemeException(host.getSchemeName() +
229 " protocol does not support connection upgrade");
230 }
231 final LayeredConnectionSocketFactory lsf = (LayeredConnectionSocketFactory) sf;
232 Socket sock = conn.getSocket();
233 if (sock == null) {
234 throw new ConnectionClosedException("Connection is closed");
235 }
236 final int port = this.schemePortResolver.resolve(host);
237 sock = lsf.createLayeredSocket(sock, host.getHostName(), port, attachment, context);
238 conn.bind(sock);
239 }
240
241 }