1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.Lock;
39
40 import javax.net.ssl.SSLContext;
41
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.function.Decorator;
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.net.NamedEndpoint;
46 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48 import org.apache.hc.core5.reactor.ssl.SSLMode;
49 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51 import org.apache.hc.core5.reactor.ssl.TlsDetails;
52 import org.apache.hc.core5.util.Asserts;
53 import org.apache.hc.core5.util.Timeout;
54
55 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56
57 private final IOSession ioSession;
58 private final NamedEndpoint initialEndpoint;
59 private final Decorator<IOSession> ioSessionDecorator;
60 private final IOSessionListener sessionListener;
61 private final Queue<InternalDataChannel> closedSessions;
62 private final AtomicReference<SSLIOSession> tlsSessionRef;
63 private final AtomicReference<IOSession> currentSessionRef;
64 private final AtomicReference<IOEventHandler> eventHandlerRef;
65 private final AtomicBoolean closed;
66
67 InternalDataChannel(
68 final IOSession ioSession,
69 final NamedEndpoint initialEndpoint,
70 final Decorator<IOSession> ioSessionDecorator,
71 final IOSessionListener sessionListener,
72 final Queue<InternalDataChannel> closedSessions) {
73 this.ioSession = ioSession;
74 this.initialEndpoint = initialEndpoint;
75 this.closedSessions = closedSessions;
76 this.ioSessionDecorator = ioSessionDecorator;
77 this.sessionListener = sessionListener;
78 this.tlsSessionRef = new AtomicReference<>();
79 this.currentSessionRef = new AtomicReference<>(
80 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
81 this.eventHandlerRef = new AtomicReference<>();
82 this.closed = new AtomicBoolean(false);
83 }
84
85 @Override
86 public String getId() {
87 return ioSession.getId();
88 }
89
90 @Override
91 public NamedEndpoint getInitialEndpoint() {
92 return initialEndpoint;
93 }
94
95 @Override
96 public IOEventHandler getHandler() {
97 return eventHandlerRef.get();
98 }
99
100 @Override
101 public void upgrade(final IOEventHandler handler) {
102 final IOSession currentSession = currentSessionRef.get();
103 currentSession.upgrade(handler);
104 eventHandlerRef.set(handler);
105 }
106
107 private IOEventHandler ensureHandler(final IOSession session) {
108 final IOEventHandler handler = session.getHandler();
109 Asserts.notNull(handler, "IO event handler");
110 return handler;
111 }
112
113 @Override
114 void onIOEvent(final int readyOps) throws IOException {
115 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
116 final IOSession currentSession = currentSessionRef.get();
117 currentSession.clearEvent(SelectionKey.OP_CONNECT);
118 if (tlsSessionRef.get() == null) {
119 if (sessionListener != null) {
120 sessionListener.connected(currentSession);
121 }
122 final IOEventHandler handler = ensureHandler(currentSession);
123 handler.connected(currentSession);
124 }
125 }
126 if ((readyOps & SelectionKey.OP_READ) != 0) {
127 final IOSession currentSession = currentSessionRef.get();
128 currentSession.updateReadTime();
129 if (sessionListener != null) {
130 sessionListener.inputReady(currentSession);
131 }
132 final IOEventHandler handler = ensureHandler(currentSession);
133 handler.inputReady(currentSession, null);
134 }
135 if ((readyOps & SelectionKey.OP_WRITE) != 0
136 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
137 final IOSession currentSession = currentSessionRef.get();
138 currentSession.updateWriteTime();
139 if (sessionListener != null) {
140 sessionListener.outputReady(currentSession);
141 }
142 final IOEventHandler handler = ensureHandler(currentSession);
143 handler.outputReady(currentSession);
144 }
145 }
146
147 @Override
148 Timeout getTimeout() {
149 final IOSession currentSession = currentSessionRef.get();
150 return currentSession.getSocketTimeout();
151 }
152
153 @Override
154 void onTimeout(final Timeout timeout) throws IOException {
155 final IOSession currentSession = currentSessionRef.get();
156 if (sessionListener != null) {
157 sessionListener.timeout(currentSession);
158 }
159 final IOEventHandler handler = ensureHandler(currentSession);
160 handler.timeout(currentSession, timeout);
161 }
162
163 @Override
164 void onException(final Exception cause) {
165 final IOSession currentSession = currentSessionRef.get();
166 if (sessionListener != null) {
167 sessionListener.exception(currentSession, cause);
168 }
169 final IOEventHandler handler = currentSession.getHandler();
170 if (handler != null) {
171 handler.exception(currentSession, cause);
172 }
173 }
174
175 void onTLSSessionStart(final SSLIOSession sslSession) {
176 final IOSession currentSession = currentSessionRef.get();
177 if (sessionListener != null) {
178 sessionListener.connected(currentSession);
179 }
180 }
181
182 void onTLSSessionEnd() {
183 if (closed.compareAndSet(false, true)) {
184 closedSessions.add(this);
185 }
186 }
187
188 void disconnected() {
189 final IOSession currentSession = currentSessionRef.get();
190 if (sessionListener != null) {
191 sessionListener.disconnected(currentSession);
192 }
193 final IOEventHandler handler = currentSession.getHandler();
194 if (handler != null) {
195 handler.disconnected(currentSession);
196 }
197 }
198
199 @Override
200 public void startTls(
201 final SSLContext sslContext,
202 final NamedEndpoint endpoint,
203 final SSLBufferMode sslBufferMode,
204 final SSLSessionInitializer initializer,
205 final SSLSessionVerifier verifier,
206 final Timeout handshakeTimeout) {
207 final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
208 endpoint != null ? endpoint : initialEndpoint,
209 ioSession,
210 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
211 sslContext,
212 sslBufferMode,
213 initializer,
214 verifier,
215 new Callback<SSLIOSession>() {
216
217 @Override
218 public void execute(final SSLIOSession sslSession) {
219 onTLSSessionStart(sslSession);
220 }
221
222 },
223 new Callback<SSLIOSession>() {
224
225 @Override
226 public void execute(final SSLIOSession sslSession) {
227 onTLSSessionEnd();
228 }
229
230 },
231 handshakeTimeout);
232 if (tlsSessionRef.compareAndSet(null, sslioSession)) {
233 currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
234 if (sessionListener != null) {
235 sessionListener.startTls(sslioSession);
236 }
237 } else {
238 throw new IllegalStateException("TLS already activated");
239 }
240 }
241
242 @SuppressWarnings("resource")
243 @Override
244 public TlsDetails getTlsDetails() {
245 final SSLIOSession sslIoSession = tlsSessionRef.get();
246 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
247 }
248
249 @Override
250 public Lock getLock() {
251 return ioSession.getLock();
252 }
253
254 @Override
255 public void close() {
256 close(CloseMode.GRACEFUL);
257 }
258
259 @Override
260 public void close(final CloseMode closeMode) {
261 final IOSession currentSession = currentSessionRef.get();
262 if (closeMode == CloseMode.IMMEDIATE) {
263 closed.set(true);
264 currentSession.close(closeMode);
265 } else {
266 if (closed.compareAndSet(false, true)) {
267 try {
268 currentSession.close(closeMode);
269 } finally {
270 closedSessions.add(this);
271 }
272 }
273 }
274 }
275
276 @Override
277 public IOSession.Status getStatus() {
278 final IOSession currentSession = currentSessionRef.get();
279 return currentSession.getStatus();
280 }
281
282 @Override
283 public boolean isOpen() {
284 final IOSession currentSession = currentSessionRef.get();
285 return currentSession.isOpen();
286 }
287
288 @Override
289 public void enqueue(final Command command, final Command.Priority priority) {
290 final IOSession currentSession = currentSessionRef.get();
291 currentSession.enqueue(command, priority);
292 }
293
294 @Override
295 public boolean hasCommands() {
296 final IOSession currentSession = currentSessionRef.get();
297 return currentSession.hasCommands();
298 }
299
300 @Override
301 public Command poll() {
302 final IOSession currentSession = currentSessionRef.get();
303 return currentSession.poll();
304 }
305
306 @Override
307 public ByteChannel channel() {
308 final IOSession currentSession = currentSessionRef.get();
309 return currentSession.channel();
310 }
311
312 @Override
313 public SocketAddress getRemoteAddress() {
314 return ioSession.getRemoteAddress();
315 }
316
317 @Override
318 public SocketAddress getLocalAddress() {
319 return ioSession.getLocalAddress();
320 }
321
322 @Override
323 public int getEventMask() {
324 final IOSession currentSession = currentSessionRef.get();
325 return currentSession.getEventMask();
326 }
327
328 @Override
329 public void setEventMask(final int ops) {
330 final IOSession currentSession = currentSessionRef.get();
331 currentSession.setEventMask(ops);
332 }
333
334 @Override
335 public void setEvent(final int op) {
336 final IOSession currentSession = currentSessionRef.get();
337 currentSession.setEvent(op);
338 }
339
340 @Override
341 public void clearEvent(final int op) {
342 final IOSession currentSession = currentSessionRef.get();
343 currentSession.clearEvent(op);
344 }
345
346 @Override
347 public Timeout getSocketTimeout() {
348 return ioSession.getSocketTimeout();
349 }
350
351 @Override
352 public void setSocketTimeout(final Timeout timeout) {
353 ioSession.setSocketTimeout(timeout);
354 }
355
356 @Override
357 public int read(final ByteBuffer dst) throws IOException {
358 final IOSession currentSession = currentSessionRef.get();
359 return currentSession.read(dst);
360 }
361
362 @Override
363 public int write(final ByteBuffer src) throws IOException {
364 final IOSession currentSession = currentSessionRef.get();
365 return currentSession.write(src);
366 }
367
368 @Override
369 public void updateReadTime() {
370 ioSession.updateReadTime();
371 }
372
373 @Override
374 public void updateWriteTime() {
375 ioSession.updateWriteTime();
376 }
377
378 @Override
379 public long getLastReadTime() {
380 return ioSession.getLastReadTime();
381 }
382
383 @Override
384 public long getLastWriteTime() {
385 return ioSession.getLastWriteTime();
386 }
387
388 @Override
389 public long getLastEventTime() {
390 return ioSession.getLastEventTime();
391 }
392
393 @Override
394 public String toString() {
395 final IOSession currentSession = currentSessionRef.get();
396 if (currentSession != null) {
397 return currentSession.toString();
398 } else {
399 return ioSession.toString();
400 }
401 }
402
403 }