1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40 import java.util.concurrent.locks.Lock;
41
42 import javax.net.ssl.SSLContext;
43 import javax.net.ssl.SSLSession;
44
45 import org.apache.hc.core5.concurrent.CallbackContribution;
46 import org.apache.hc.core5.concurrent.FutureCallback;
47 import org.apache.hc.core5.function.Decorator;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.net.NamedEndpoint;
50 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
51 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
52 import org.apache.hc.core5.reactor.ssl.SSLMode;
53 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
54 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
55 import org.apache.hc.core5.reactor.ssl.TlsDetails;
56 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
57 import org.apache.hc.core5.util.Args;
58 import org.apache.hc.core5.util.Asserts;
59 import org.apache.hc.core5.util.TextUtils;
60 import org.apache.hc.core5.util.Timeout;
61
62 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
63
64 private final IOSession ioSession;
65 private final NamedEndpoint initialEndpoint;
66 private final Decorator<IOSession> ioSessionDecorator;
67 private final IOSessionListener sessionListener;
68 private final Queue<InternalDataChannel> closedSessions;
69 private final AtomicReference<SSLIOSession> tlsSessionRef;
70 private final AtomicReference<IOSession> currentSessionRef;
71 private final AtomicReference<IOEventHandler> eventHandlerRef;
72 private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
73 private final AtomicBoolean closed;
74
75 InternalDataChannel(
76 final IOSession ioSession,
77 final NamedEndpoint initialEndpoint,
78 final Decorator<IOSession> ioSessionDecorator,
79 final IOSessionListener sessionListener,
80 final Queue<InternalDataChannel> closedSessions) {
81 this.ioSession = ioSession;
82 this.initialEndpoint = initialEndpoint;
83 this.closedSessions = closedSessions;
84 this.ioSessionDecorator = ioSessionDecorator;
85 this.sessionListener = sessionListener;
86 this.tlsSessionRef = new AtomicReference<>();
87 this.currentSessionRef = new AtomicReference<>(
88 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
89 this.eventHandlerRef = new AtomicReference<>();
90 this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
91 this.closed = new AtomicBoolean(false);
92 }
93
94 @Override
95 public String getId() {
96 return ioSession.getId();
97 }
98
99 @Override
100 public NamedEndpoint getInitialEndpoint() {
101 return initialEndpoint;
102 }
103
104 @Override
105 public IOEventHandler getHandler() {
106 return eventHandlerRef.get();
107 }
108
109 @Override
110 public void upgrade(final IOEventHandler handler) {
111 final IOSession currentSession = currentSessionRef.get();
112 currentSession.upgrade(handler);
113 eventHandlerRef.set(handler);
114 }
115
116 private IOEventHandler ensureHandler(final IOSession session) {
117 final IOEventHandler handler = session.getHandler();
118 Asserts.notNull(handler, "IO event handler");
119 return handler;
120 }
121
122 @Override
123 void onIOEvent(final int readyOps) throws IOException {
124 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
125 final IOSession currentSession = currentSessionRef.get();
126 currentSession.clearEvent(SelectionKey.OP_CONNECT);
127 if (tlsSessionRef.get() == null) {
128 if (sessionListener != null) {
129 sessionListener.connected(currentSession);
130 }
131 final IOEventHandler handler = ensureHandler(currentSession);
132 handler.connected(currentSession);
133 }
134 }
135 if ((readyOps & SelectionKey.OP_READ) != 0) {
136 final IOSession currentSession = currentSessionRef.get();
137 currentSession.updateReadTime();
138 if (sessionListener != null) {
139 sessionListener.inputReady(currentSession);
140 }
141 final IOEventHandler handler = ensureHandler(currentSession);
142 handler.inputReady(currentSession, null);
143 }
144 if ((readyOps & SelectionKey.OP_WRITE) != 0
145 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
146 final IOSession currentSession = currentSessionRef.get();
147 currentSession.updateWriteTime();
148 if (sessionListener != null) {
149 sessionListener.outputReady(currentSession);
150 }
151 final IOEventHandler handler = ensureHandler(currentSession);
152 handler.outputReady(currentSession);
153 }
154 }
155
156 @Override
157 Timeout getTimeout() {
158 final IOSession currentSession = currentSessionRef.get();
159 return currentSession.getSocketTimeout();
160 }
161
162 @Override
163 void onTimeout(final Timeout timeout) throws IOException {
164 final IOSession currentSession = currentSessionRef.get();
165 if (sessionListener != null) {
166 sessionListener.timeout(currentSession);
167 }
168 final IOEventHandler handler = ensureHandler(currentSession);
169 handler.timeout(currentSession, timeout);
170 }
171
172 @Override
173 void onException(final Exception cause) {
174 final IOSession currentSession = currentSessionRef.get();
175 if (sessionListener != null) {
176 sessionListener.exception(currentSession, cause);
177 }
178 final IOEventHandler handler = currentSession.getHandler();
179 if (handler != null) {
180 handler.exception(currentSession, cause);
181 }
182 }
183
184 void onTLSSessionStart(final SSLIOSession sslSession) {
185 final IOSession currentSession = currentSessionRef.get();
186 if (sessionListener != null) {
187 sessionListener.connected(currentSession);
188 }
189 }
190
191 void onTLSSessionEnd(final SSLIOSession sslSession) {
192 if (closed.compareAndSet(false, true)) {
193 closedSessions.add(this);
194 }
195 }
196
197 void disconnected() {
198 final IOSession currentSession = currentSessionRef.get();
199 if (sessionListener != null) {
200 sessionListener.disconnected(currentSession);
201 }
202 final IOEventHandler handler = currentSession.getHandler();
203 if (handler != null) {
204 handler.disconnected(currentSession);
205 }
206 }
207
208 @Override
209 public void startTls(
210 final SSLContext sslContext,
211 final NamedEndpoint endpoint,
212 final SSLBufferMode sslBufferMode,
213 final SSLSessionInitializer initializer,
214 final SSLSessionVerifier verifier,
215 final Timeout handshakeTimeout) {
216 startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null);
217 }
218
219 @Override
220 public void startTls(
221 final SSLContext sslContext,
222 final NamedEndpoint endpoint,
223 final SSLBufferMode sslBufferMode,
224 final SSLSessionInitializer initializer,
225 final SSLSessionVerifier verifier,
226 final Timeout handshakeTimeout,
227 final FutureCallback<TransportSecurityLayer> callback) {
228 final SSLIOSession sslioSession = new SSLIOSession(
229 endpoint != null ? endpoint : initialEndpoint,
230 ioSession,
231 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
232 sslContext,
233 sslBufferMode,
234 initializer,
235 verifier,
236 handshakeTimeout,
237 this::onTLSSessionStart,
238 this::onTLSSessionEnd,
239 new CallbackContribution<SSLSession>(callback) {
240
241 @Override
242 public void completed(final SSLSession sslSession) {
243 if (callback != null) {
244 callback.completed(InternalDataChannel.this);
245 }
246 }
247
248 });
249 if (tlsSessionRef.compareAndSet(null, sslioSession)) {
250 currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
251 } else {
252 throw new IllegalStateException("TLS already activated");
253 }
254 try {
255 if (sessionListener != null) {
256 sessionListener.startTls(sslioSession);
257 }
258 sslioSession.beginHandshake(this);
259 } catch (final Exception ex) {
260 onException(ex);
261 }
262 }
263
264 @SuppressWarnings("resource")
265 @Override
266 public TlsDetails getTlsDetails() {
267 final SSLIOSession sslIoSession = tlsSessionRef.get();
268 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
269 }
270
271 @Override
272 public Lock getLock() {
273 return ioSession.getLock();
274 }
275
276 @Override
277 public void close() {
278 close(CloseMode.GRACEFUL);
279 }
280
281 @Override
282 public void close(final CloseMode closeMode) {
283 final IOSession currentSession = currentSessionRef.get();
284 if (closeMode == CloseMode.IMMEDIATE) {
285 closed.set(true);
286 currentSession.close(closeMode);
287 } else {
288 if (closed.compareAndSet(false, true)) {
289 try {
290 currentSession.close(closeMode);
291 } finally {
292 closedSessions.add(this);
293 }
294 }
295 }
296 }
297
298 @Override
299 public IOSession.Status getStatus() {
300 final IOSession currentSession = currentSessionRef.get();
301 return currentSession.getStatus();
302 }
303
304 @Override
305 public boolean isOpen() {
306 final IOSession currentSession = currentSessionRef.get();
307 return currentSession.isOpen();
308 }
309
310 @Override
311 public void enqueue(final Command command, final Command.Priority priority) {
312 final IOSession currentSession = currentSessionRef.get();
313 currentSession.enqueue(command, priority);
314 }
315
316 @Override
317 public boolean hasCommands() {
318 final IOSession currentSession = currentSessionRef.get();
319 return currentSession.hasCommands();
320 }
321
322 @Override
323 public Command poll() {
324 final IOSession currentSession = currentSessionRef.get();
325 return currentSession.poll();
326 }
327
328 @Override
329 public ByteChannel channel() {
330 final IOSession currentSession = currentSessionRef.get();
331 return currentSession.channel();
332 }
333
334 @Override
335 public SocketAddress getRemoteAddress() {
336 return ioSession.getRemoteAddress();
337 }
338
339 @Override
340 public SocketAddress getLocalAddress() {
341 return ioSession.getLocalAddress();
342 }
343
344 @Override
345 public int getEventMask() {
346 final IOSession currentSession = currentSessionRef.get();
347 return currentSession.getEventMask();
348 }
349
350 @Override
351 public void setEventMask(final int ops) {
352 final IOSession currentSession = currentSessionRef.get();
353 currentSession.setEventMask(ops);
354 }
355
356 @Override
357 public void setEvent(final int op) {
358 final IOSession currentSession = currentSessionRef.get();
359 currentSession.setEvent(op);
360 }
361
362 @Override
363 public void clearEvent(final int op) {
364 final IOSession currentSession = currentSessionRef.get();
365 currentSession.clearEvent(op);
366 }
367
368 @Override
369 public Timeout getSocketTimeout() {
370 return ioSession.getSocketTimeout();
371 }
372
373 @Override
374 public void setSocketTimeout(final Timeout timeout) {
375 ioSession.setSocketTimeout(timeout);
376 }
377
378 @Override
379 public int read(final ByteBuffer dst) throws IOException {
380 final IOSession currentSession = currentSessionRef.get();
381 return currentSession.read(dst);
382 }
383
384 @Override
385 public int write(final ByteBuffer src) throws IOException {
386 final IOSession currentSession = currentSessionRef.get();
387 return currentSession.write(src);
388 }
389
390 @Override
391 public void updateReadTime() {
392 ioSession.updateReadTime();
393 }
394
395 @Override
396 public void updateWriteTime() {
397 ioSession.updateWriteTime();
398 }
399
400 @Override
401 public long getLastReadTime() {
402 return ioSession.getLastReadTime();
403 }
404
405 @Override
406 public long getLastWriteTime() {
407 return ioSession.getLastWriteTime();
408 }
409
410 @Override
411 public long getLastEventTime() {
412 return ioSession.getLastEventTime();
413 }
414
415 @Override
416 public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
417 Args.notEmpty(protocolId, "Application protocol ID");
418 final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(TextUtils.toLowerCase(protocolId));
419 if (upgradeHandler != null) {
420 upgradeHandler.upgrade(this, callback);
421 } else {
422 throw new IllegalStateException("Unsupported protocol: " + protocolId);
423 }
424 }
425
426 @Override
427 public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
428 Args.notEmpty(protocolId, "Application protocol ID");
429 Args.notNull(upgradeHandler, "Protocol upgrade handler");
430 protocolUpgradeHandlerMap.put(TextUtils.toLowerCase(protocolId), upgradeHandler);
431 }
432
433 @Override
434 public String toString() {
435 final IOSession currentSession = currentSessionRef.get();
436 if (currentSession != null) {
437 return currentSession.toString();
438 } else {
439 return ioSession.toString();
440 }
441 }
442
443 }