1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.Lock;
39
40 import javax.net.ssl.SSLContext;
41
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.function.Decorator;
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.net.NamedEndpoint;
46 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48 import org.apache.hc.core5.reactor.ssl.SSLMode;
49 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51 import org.apache.hc.core5.reactor.ssl.TlsDetails;
52 import org.apache.hc.core5.util.Asserts;
53 import org.apache.hc.core5.util.Timeout;
54
55 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56
57 private final IOSession ioSession;
58 private final NamedEndpoint initialEndpoint;
59 private final Decorator<IOSession> ioSessionDecorator;
60 private final IOSessionListener sessionListener;
61 private final AtomicReference<IOSession> currentSessionRef;
62 private final AtomicReference<SSLIOSession> tlsSessionRef;
63 private final Queue<InternalDataChannel> closedSessions;
64 private final AtomicBoolean connected;
65 private final AtomicBoolean closed;
66
67 InternalDataChannel(
68 final IOSession ioSession,
69 final NamedEndpoint initialEndpoint,
70 final Decorator<IOSession> ioSessionDecorator,
71 final IOSessionListener sessionListener,
72 final Queue<InternalDataChannel> closedSessions) {
73 this.ioSession = ioSession;
74 this.currentSessionRef = new AtomicReference<>(
75 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
76 this.initialEndpoint = initialEndpoint;
77 this.ioSessionDecorator = ioSessionDecorator;
78 this.closedSessions = closedSessions;
79 this.sessionListener = sessionListener;
80 this.tlsSessionRef = new AtomicReference<>(null);
81 this.connected = new AtomicBoolean(false);
82 this.closed = new AtomicBoolean(false);
83 }
84
85 @Override
86 public String getId() {
87 return ioSession.getId();
88 }
89
90 @Override
91 public NamedEndpoint getInitialEndpoint() {
92 return initialEndpoint;
93 }
94
95 @Override
96 public IOEventHandler getHandler() {
97 final IOSession currentSession = currentSessionRef.get();
98 return currentSession.getHandler();
99 }
100
101 @Override
102 public void upgrade(final IOEventHandler handler) {
103 final IOSession currentSession = currentSessionRef.get();
104 currentSession.upgrade(handler);
105 }
106
107 private IOEventHandler ensureHandler(final IOSession session) {
108 final IOEventHandler handler = session.getHandler();
109 Asserts.notNull(handler, "IO event handler");
110 return handler;
111 }
112
113 @Override
114 void onIOEvent(final int readyOps) throws IOException {
115 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
116 final IOSession currentSession = currentSessionRef.get();
117 currentSession.clearEvent(SelectionKey.OP_CONNECT);
118 if (tlsSessionRef.get() == null && connected.compareAndSet(false, true)) {
119 if (sessionListener != null) {
120 sessionListener.connected(this);
121 }
122 final IOEventHandler handler = ensureHandler(currentSession);
123 handler.connected(this);
124 }
125 }
126 if ((readyOps & SelectionKey.OP_READ) != 0) {
127 final IOSession currentSession = currentSessionRef.get();
128 currentSession.updateReadTime();
129 if (sessionListener != null) {
130 sessionListener.inputReady(this);
131 }
132 final IOEventHandler handler = ensureHandler(currentSession);
133 handler.inputReady(this, null);
134 }
135 if ((readyOps & SelectionKey.OP_WRITE) != 0
136 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
137 final IOSession currentSession = currentSessionRef.get();
138 currentSession.updateWriteTime();
139 if (sessionListener != null) {
140 sessionListener.outputReady(this);
141 }
142 final IOEventHandler handler = ensureHandler(currentSession);
143 handler.outputReady(this);
144 }
145 }
146
147 @Override
148 Timeout getTimeout() {
149 return ioSession.getSocketTimeout();
150 }
151
152 @Override
153 void onTimeout(final Timeout timeout) throws IOException {
154 if (sessionListener != null) {
155 sessionListener.timeout(this);
156 }
157 final IOSession currentSession = currentSessionRef.get();
158 final IOEventHandler handler = ensureHandler(currentSession);
159 handler.timeout(this, timeout);
160 }
161
162 @Override
163 void onException(final Exception cause) {
164 if (sessionListener != null) {
165 sessionListener.exception(this, cause);
166 }
167 final IOSession currentSession = currentSessionRef.get();
168 final IOEventHandler handler = currentSession.getHandler();
169 if (handler != null) {
170 handler.exception(this, cause);
171 }
172 }
173
174 void disconnected() {
175 if (sessionListener != null) {
176 sessionListener.disconnected(this);
177 }
178 final IOSession currentSession = currentSessionRef.get();
179 final IOEventHandler handler = currentSession.getHandler();
180 if (handler != null) {
181 handler.disconnected(this);
182 }
183 }
184
185 @Override
186 public void startTls(
187 final SSLContext sslContext,
188 final NamedEndpoint endpoint,
189 final SSLBufferMode sslBufferMode,
190 final SSLSessionInitializer initializer,
191 final SSLSessionVerifier verifier,
192 final Timeout handshakeTimeout) {
193 final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
194 endpoint != null ? endpoint : initialEndpoint,
195 ioSession,
196 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
197 sslContext,
198 sslBufferMode,
199 initializer,
200 verifier,
201 new Callback<SSLIOSession>() {
202
203 @Override
204 public void execute(final SSLIOSession sslSession) {
205 if (connected.compareAndSet(false, true)) {
206 final IOEventHandler handler = ensureHandler(ioSession);
207 try {
208 if (sessionListener != null) {
209 sessionListener.connected(InternalDataChannel.this);
210 }
211 handler.connected(InternalDataChannel.this);
212 } catch (final Exception ex) {
213 if (sessionListener != null) {
214 sessionListener.exception(InternalDataChannel.this, ex);
215 }
216 handler.exception(InternalDataChannel.this, ex);
217 }
218 }
219 }
220
221 },
222 new Callback<SSLIOSession>() {
223
224 @Override
225 public void execute(final SSLIOSession sslSession) {
226 if (closed.compareAndSet(false, true)) {
227 closedSessions.add(InternalDataChannel.this);
228 }
229 }
230
231 },
232 handshakeTimeout);
233 if (tlsSessionRef.compareAndSet(null, sslioSession)) {
234 currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
235 if (sessionListener != null) {
236 sessionListener.startTls(this);
237 }
238 } else {
239 throw new IllegalStateException("TLS already activated");
240 }
241 }
242
243 @SuppressWarnings("resource")
244 @Override
245 public TlsDetails getTlsDetails() {
246 final SSLIOSession sslIoSession = tlsSessionRef.get();
247 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
248 }
249
250 @Override
251 public Lock getLock() {
252 return ioSession.getLock();
253 }
254
255 @Override
256 public void close() {
257 close(CloseMode.GRACEFUL);
258 }
259
260 @Override
261 public void close(final CloseMode closeMode) {
262 final IOSession currentSession = currentSessionRef.get();
263 if (closeMode == CloseMode.IMMEDIATE) {
264 closed.set(true);
265 currentSession.close(closeMode);
266 } else {
267 if (closed.compareAndSet(false, true)) {
268 try {
269 currentSession.close(closeMode);
270 } finally {
271 closedSessions.add(this);
272 }
273 }
274 }
275 }
276
277 @Override
278 public IOSession.Status getStatus() {
279 final IOSession currentSession = currentSessionRef.get();
280 return currentSession.getStatus();
281 }
282
283 @Override
284 public boolean isOpen() {
285 final IOSession currentSession = currentSessionRef.get();
286 return currentSession.isOpen();
287 }
288
289 @Override
290 public void enqueue(final Command command, final Command.Priority priority) {
291 final IOSession currentSession = currentSessionRef.get();
292 currentSession.enqueue(command, priority);
293 }
294
295 @Override
296 public boolean hasCommands() {
297 final IOSession currentSession = currentSessionRef.get();
298 return currentSession.hasCommands();
299 }
300
301 @Override
302 public Command poll() {
303 final IOSession currentSession = currentSessionRef.get();
304 return currentSession.poll();
305 }
306
307 @Override
308 public ByteChannel channel() {
309 final IOSession currentSession = currentSessionRef.get();
310 return currentSession.channel();
311 }
312
313 @Override
314 public SocketAddress getRemoteAddress() {
315 return ioSession.getRemoteAddress();
316 }
317
318 @Override
319 public SocketAddress getLocalAddress() {
320 return ioSession.getLocalAddress();
321 }
322
323 @Override
324 public int getEventMask() {
325 final IOSession currentSession = currentSessionRef.get();
326 return currentSession.getEventMask();
327 }
328
329 @Override
330 public void setEventMask(final int ops) {
331 final IOSession currentSession = currentSessionRef.get();
332 currentSession.setEventMask(ops);
333 }
334
335 @Override
336 public void setEvent(final int op) {
337 final IOSession currentSession = currentSessionRef.get();
338 currentSession.setEvent(op);
339 }
340
341 @Override
342 public void clearEvent(final int op) {
343 final IOSession currentSession = currentSessionRef.get();
344 currentSession.clearEvent(op);
345 }
346
347 @Override
348 public Timeout getSocketTimeout() {
349 return ioSession.getSocketTimeout();
350 }
351
352 @Override
353 public void setSocketTimeout(final Timeout timeout) {
354 ioSession.setSocketTimeout(timeout);
355 }
356
357 @Override
358 public int read(final ByteBuffer dst) throws IOException {
359 final IOSession currentSession = currentSessionRef.get();
360 return currentSession.read(dst);
361 }
362
363 @Override
364 public int write(final ByteBuffer src) throws IOException {
365 final IOSession currentSession = currentSessionRef.get();
366 return currentSession.write(src);
367 }
368
369 @Override
370 public void updateReadTime() {
371 ioSession.updateReadTime();
372 }
373
374 @Override
375 public void updateWriteTime() {
376 ioSession.updateWriteTime();
377 }
378
379 @Override
380 public long getLastReadTime() {
381 return ioSession.getLastReadTime();
382 }
383
384 @Override
385 public long getLastWriteTime() {
386 return ioSession.getLastWriteTime();
387 }
388
389 @Override
390 public long getLastEventTime() {
391 return ioSession.getLastEventTime();
392 }
393
394 @Override
395 public String toString() {
396 final IOSession currentSession = currentSessionRef.get();
397 if (currentSession != null) {
398 return currentSession.toString();
399 } else {
400 return ioSession.toString();
401 }
402 }
403
404 }