1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.Lock;
39
40 import javax.net.ssl.SSLContext;
41
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.io.CloseMode;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
46 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
47 import org.apache.hc.core5.reactor.ssl.SSLMode;
48 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
49 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
50 import org.apache.hc.core5.reactor.ssl.TlsDetails;
51 import org.apache.hc.core5.util.Asserts;
52 import org.apache.hc.core5.util.Timeout;
53
54 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
55
56 private final IOSession ioSession;
57 private final NamedEndpoint initialEndpoint;
58 private final IOSessionListener sessionListener;
59 private final AtomicReference<SSLIOSession> tlsSessionRef;
60 private final Queue<InternalDataChannel> closedSessions;
61 private final AtomicBoolean connected;
62 private final AtomicBoolean closed;
63
64 InternalDataChannel(
65 final IOSession ioSession,
66 final NamedEndpoint initialEndpoint,
67 final IOSessionListener sessionListener,
68 final Queue<InternalDataChannel> closedSessions) {
69 this.ioSession = ioSession;
70 this.initialEndpoint = initialEndpoint;
71 this.closedSessions = closedSessions;
72 this.sessionListener = sessionListener;
73 this.tlsSessionRef = new AtomicReference<>(null);
74 this.connected = new AtomicBoolean(false);
75 this.closed = new AtomicBoolean(false);
76 }
77
78 @Override
79 public String getId() {
80 return ioSession.getId();
81 }
82
83 @Override
84 public NamedEndpoint getInitialEndpoint() {
85 return initialEndpoint;
86 }
87
88 @Override
89 public IOEventHandler getHandler() {
90 return ioSession.getHandler();
91 }
92
93 @Override
94 public void upgrade(final IOEventHandler handler) {
95 ioSession.upgrade(handler);
96 }
97
98 private IOEventHandler ensureHandler(final IOSession session) {
99 final IOEventHandler handler = session.getHandler();
100 Asserts.notNull(handler, "IO event handler");
101 return handler;
102 }
103
104 @Override
105 void onIOEvent(final int readyOps) throws IOException {
106 final SSLIOSession tlsSession = tlsSessionRef.get();
107 final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
108 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
109 currentSession.clearEvent(SelectionKey.OP_CONNECT);
110 if (tlsSession == null && connected.compareAndSet(false, true)) {
111 if (sessionListener != null) {
112 sessionListener.connected(this);
113 }
114 final IOEventHandler handler = ensureHandler(currentSession);
115 handler.connected(this);
116 }
117 }
118 if ((readyOps & SelectionKey.OP_READ) != 0) {
119 currentSession.updateReadTime();
120 if (sessionListener != null) {
121 sessionListener.inputReady(this);
122 }
123 final IOEventHandler handler = ensureHandler(currentSession);
124 handler.inputReady(this, null);
125 }
126 if ((readyOps & SelectionKey.OP_WRITE) != 0
127 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
128 currentSession.updateWriteTime();
129 if (sessionListener != null) {
130 sessionListener.outputReady(this);
131 }
132 final IOEventHandler handler = ensureHandler(currentSession);
133 handler.outputReady(this);
134 }
135 }
136
137 @Override
138 Timeout getTimeout() {
139 return ioSession.getSocketTimeout();
140 }
141
142 @Override
143 void onTimeout(final Timeout timeout) throws IOException {
144 if (sessionListener != null) {
145 sessionListener.timeout(this);
146 }
147 final SSLIOSession tlsSession = tlsSessionRef.get();
148 final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
149 final IOEventHandler handler = ensureHandler(currentSession);
150 handler.timeout(this, timeout);
151 }
152
153 @Override
154 void onException(final Exception cause) {
155 if (sessionListener != null) {
156 sessionListener.exception(this, cause);
157 }
158 final SSLIOSession tlsSession = tlsSessionRef.get();
159 final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
160 final IOEventHandler handler = currentSession.getHandler();
161 if (handler != null) {
162 handler.exception(this, cause);
163 }
164 }
165
166 void disconnected() {
167 if (sessionListener != null) {
168 sessionListener.disconnected(this);
169 }
170 final SSLIOSession tlsSession = tlsSessionRef.get();
171 final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
172 final IOEventHandler handler = currentSession.getHandler();
173 if (handler != null) {
174 handler.disconnected(this);
175 }
176 }
177
178 @Override
179 public void startTls(
180 final SSLContext sslContext,
181 final NamedEndpoint endpoint,
182 final SSLBufferMode sslBufferMode,
183 final SSLSessionInitializer initializer,
184 final SSLSessionVerifier verifier,
185 final Timeout handshakeTimeout) {
186 if (tlsSessionRef.compareAndSet(null, new SSLIOSession(
187 endpoint != null ? endpoint : initialEndpoint,
188 ioSession,
189 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
190 sslContext,
191 sslBufferMode,
192 initializer,
193 verifier,
194 new Callback<SSLIOSession>() {
195
196 @Override
197 public void execute(final SSLIOSession sslSession) {
198 if (connected.compareAndSet(false, true)) {
199 final IOEventHandler handler = ensureHandler(ioSession);
200 try {
201 if (sessionListener != null) {
202 sessionListener.connected(InternalDataChannel.this);
203 }
204 handler.connected(InternalDataChannel.this);
205 } catch (final Exception ex) {
206 if (sessionListener != null) {
207 sessionListener.exception(InternalDataChannel.this, ex);
208 }
209 handler.exception(InternalDataChannel.this, ex);
210 }
211 }
212 }
213
214 },
215 new Callback<SSLIOSession>() {
216
217 @Override
218 public void execute(final SSLIOSession sslSession) {
219 if (closed.compareAndSet(false, true)) {
220 closedSessions.add(InternalDataChannel.this);
221 }
222 }
223
224 },
225 handshakeTimeout))) {
226 if (sessionListener != null) {
227 sessionListener.startTls(this);
228 }
229 } else {
230 throw new IllegalStateException("TLS already activated");
231 }
232 }
233
234 @SuppressWarnings("resource")
235 @Override
236 public TlsDetails getTlsDetails() {
237 final SSLIOSession sslIoSession = tlsSessionRef.get();
238 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
239 }
240
241 @Override
242 public Lock getLock() {
243 return ioSession.getLock();
244 }
245
246 private IOSession getSessionImpl() {
247 final SSLIOSession tlsSession = tlsSessionRef.get();
248 return tlsSession != null ? tlsSession : ioSession;
249 }
250
251 @Override
252 public void close() {
253 close(CloseMode.GRACEFUL);
254 }
255
256 @Override
257 public void close(final CloseMode closeMode) {
258 if (closeMode == CloseMode.IMMEDIATE) {
259 closed.set(true);
260 getSessionImpl().close(closeMode);
261 } else {
262 if (closed.compareAndSet(false, true)) {
263 try {
264 getSessionImpl().close(closeMode);
265 } finally {
266 closedSessions.add(this);
267 }
268 }
269 }
270 }
271
272 @Override
273 public IOSession.Status getStatus() {
274 return getSessionImpl().getStatus();
275 }
276
277 @Override
278 public boolean isOpen() {
279 return getSessionImpl().isOpen();
280 }
281
282 @Override
283 public void enqueue(final Command command, final Command.Priority priority) {
284 getSessionImpl().enqueue(command, priority);
285 }
286
287 @Override
288 public boolean hasCommands() {
289 return getSessionImpl().hasCommands();
290 }
291
292 @Override
293 public Command poll() {
294 return getSessionImpl().poll();
295 }
296
297 @Override
298 public ByteChannel channel() {
299 return getSessionImpl().channel();
300 }
301
302 @Override
303 public SocketAddress getRemoteAddress() {
304 return ioSession.getRemoteAddress();
305 }
306
307 @Override
308 public SocketAddress getLocalAddress() {
309 return ioSession.getLocalAddress();
310 }
311
312 @Override
313 public int getEventMask() {
314 return getSessionImpl().getEventMask();
315 }
316
317 @Override
318 public void setEventMask(final int ops) {
319 getSessionImpl().setEventMask(ops);
320 }
321
322 @Override
323 public void setEvent(final int op) {
324 getSessionImpl().setEvent(op);
325 }
326
327 @Override
328 public void clearEvent(final int op) {
329 getSessionImpl().clearEvent(op);
330 }
331
332 @Override
333 public Timeout getSocketTimeout() {
334 return ioSession.getSocketTimeout();
335 }
336
337 @Override
338 public void setSocketTimeout(final Timeout timeout) {
339 ioSession.setSocketTimeout(timeout);
340 }
341
342 @Override
343 public int read(final ByteBuffer dst) throws IOException {
344 return getSessionImpl().read(dst);
345 }
346
347 @Override
348 public int write(final ByteBuffer src) throws IOException {
349 return getSessionImpl().write(src);
350 }
351
352 @Override
353 public void updateReadTime() {
354 ioSession.updateReadTime();
355 }
356
357 @Override
358 public void updateWriteTime() {
359 ioSession.updateWriteTime();
360 }
361
362 @Override
363 public long getLastReadTime() {
364 return ioSession.getLastReadTime();
365 }
366
367 @Override
368 public long getLastWriteTime() {
369 return ioSession.getLastWriteTime();
370 }
371
372 @Override
373 public long getLastEventTime() {
374 return ioSession.getLastEventTime();
375 }
376
377 @Override
378 public String toString() {
379 final SSLIOSession tlsSession = tlsSessionRef.get();
380 if (tlsSession != null) {
381 return tlsSession.toString();
382 } else {
383 return ioSession.toString();
384 }
385 }
386
387 }