View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.ByteChannel;
34  import java.nio.channels.SelectionKey;
35  import java.util.Queue;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.concurrent.locks.Lock;
39  
40  import javax.net.ssl.SSLContext;
41  
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.function.Decorator;
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.net.NamedEndpoint;
46  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
47  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
48  import org.apache.hc.core5.reactor.ssl.SSLMode;
49  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
50  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
51  import org.apache.hc.core5.reactor.ssl.TlsDetails;
52  import org.apache.hc.core5.util.Asserts;
53  import org.apache.hc.core5.util.Timeout;
54  
55  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
56  
57      private final IOSession ioSession;
58      private final NamedEndpoint initialEndpoint;
59      private final Decorator<IOSession> ioSessionDecorator;
60      private final IOSessionListener sessionListener;
61      private final AtomicReference<IOSession> currentSessionRef;
62      private final AtomicReference<SSLIOSession> tlsSessionRef;
63      private final Queue<InternalDataChannel> closedSessions;
64      private final AtomicBoolean connected;
65      private final AtomicBoolean closed;
66  
67      InternalDataChannel(
68              final IOSession ioSession,
69              final NamedEndpoint initialEndpoint,
70              final Decorator<IOSession> ioSessionDecorator,
71              final IOSessionListener sessionListener,
72              final Queue<InternalDataChannel> closedSessions) {
73          this.ioSession = ioSession;
74          this.currentSessionRef = new AtomicReference<>(
75                  ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
76          this.initialEndpoint = initialEndpoint;
77          this.ioSessionDecorator = ioSessionDecorator;
78          this.closedSessions = closedSessions;
79          this.sessionListener = sessionListener;
80          this.tlsSessionRef = new AtomicReference<>(null);
81          this.connected = new AtomicBoolean(false);
82          this.closed = new AtomicBoolean(false);
83      }
84  
85      @Override
86      public String getId() {
87          return ioSession.getId();
88      }
89  
90      @Override
91      public NamedEndpoint getInitialEndpoint() {
92          return initialEndpoint;
93      }
94  
95      @Override
96      public IOEventHandler getHandler() {
97          final IOSession currentSession = currentSessionRef.get();
98          return currentSession.getHandler();
99      }
100 
101     @Override
102     public void upgrade(final IOEventHandler handler) {
103         final IOSession currentSession = currentSessionRef.get();
104         currentSession.upgrade(handler);
105     }
106 
107     private IOEventHandler ensureHandler(final IOSession session) {
108         final IOEventHandler handler = session.getHandler();
109         Asserts.notNull(handler, "IO event handler");
110         return handler;
111     }
112 
113     @Override
114     void onIOEvent(final int readyOps) throws IOException {
115         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
116             final IOSession currentSession = currentSessionRef.get();
117             currentSession.clearEvent(SelectionKey.OP_CONNECT);
118             if (tlsSessionRef.get() == null && connected.compareAndSet(false, true)) {
119                 if (sessionListener != null) {
120                     sessionListener.connected(this);
121                 }
122                 final IOEventHandler handler = ensureHandler(currentSession);
123                 handler.connected(this);
124             }
125         }
126         if ((readyOps & SelectionKey.OP_READ) != 0) {
127             final IOSession currentSession = currentSessionRef.get();
128             currentSession.updateReadTime();
129             if (sessionListener != null) {
130                 sessionListener.inputReady(this);
131             }
132             final IOEventHandler handler = ensureHandler(currentSession);
133             handler.inputReady(this, null);
134         }
135         if ((readyOps & SelectionKey.OP_WRITE) != 0
136                 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
137             final IOSession currentSession = currentSessionRef.get();
138             currentSession.updateWriteTime();
139             if (sessionListener != null) {
140                 sessionListener.outputReady(this);
141             }
142             final IOEventHandler handler = ensureHandler(currentSession);
143             handler.outputReady(this);
144         }
145     }
146 
147     @Override
148     Timeout getTimeout() {
149         return ioSession.getSocketTimeout();
150     }
151 
152     @Override
153     void onTimeout(final Timeout timeout) throws IOException {
154         if (sessionListener != null) {
155             sessionListener.timeout(this);
156         }
157         final IOSession currentSession = currentSessionRef.get();
158         final IOEventHandler handler = ensureHandler(currentSession);
159         handler.timeout(this, timeout);
160     }
161 
162     @Override
163     void onException(final Exception cause) {
164         if (sessionListener != null) {
165             sessionListener.exception(this, cause);
166         }
167         final IOSession currentSession = currentSessionRef.get();
168         final IOEventHandler handler = currentSession.getHandler();
169         if (handler != null) {
170             handler.exception(this, cause);
171         }
172     }
173 
174     void disconnected() {
175         if (sessionListener != null) {
176             sessionListener.disconnected(this);
177         }
178         final IOSession currentSession = currentSessionRef.get();
179         final IOEventHandler handler = currentSession.getHandler();
180         if (handler != null) {
181             handler.disconnected(this);
182         }
183     }
184 
185     @Override
186     public void startTls(
187             final SSLContext sslContext,
188             final NamedEndpoint endpoint,
189             final SSLBufferMode sslBufferMode,
190             final SSLSessionInitializer initializer,
191             final SSLSessionVerifier verifier,
192             final Timeout handshakeTimeout) {
193         final SSLIOSessionSession.html#SSLIOSession">SSLIOSession sslioSession = new SSLIOSession(
194                 endpoint != null ? endpoint : initialEndpoint,
195                 ioSession,
196                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
197                 sslContext,
198                 sslBufferMode,
199                 initializer,
200                 verifier,
201                 new Callback<SSLIOSession>() {
202 
203                     @Override
204                     public void execute(final SSLIOSession sslSession) {
205                         if (connected.compareAndSet(false, true)) {
206                             final IOEventHandler handler = ensureHandler(ioSession);
207                             try {
208                                 if (sessionListener != null) {
209                                     sessionListener.connected(InternalDataChannel.this);
210                                 }
211                                 handler.connected(InternalDataChannel.this);
212                             } catch (final Exception ex) {
213                                 if (sessionListener != null) {
214                                     sessionListener.exception(InternalDataChannel.this, ex);
215                                 }
216                                 handler.exception(InternalDataChannel.this, ex);
217                             }
218                         }
219                     }
220 
221                 },
222                 new Callback<SSLIOSession>() {
223 
224                     @Override
225                     public void execute(final SSLIOSession sslSession) {
226                         if (closed.compareAndSet(false, true)) {
227                             closedSessions.add(InternalDataChannel.this);
228                         }
229                     }
230 
231                 },
232                 handshakeTimeout);
233         if (tlsSessionRef.compareAndSet(null, sslioSession)) {
234             currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
235             if (sessionListener != null) {
236                 sessionListener.startTls(this);
237             }
238         } else {
239             throw new IllegalStateException("TLS already activated");
240         }
241     }
242 
243     @SuppressWarnings("resource")
244     @Override
245     public TlsDetails getTlsDetails() {
246         final SSLIOSession sslIoSession = tlsSessionRef.get();
247         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
248     }
249 
250     @Override
251     public Lock getLock() {
252         return ioSession.getLock();
253     }
254 
255     @Override
256     public void close() {
257         close(CloseMode.GRACEFUL);
258     }
259 
260     @Override
261     public void close(final CloseMode closeMode) {
262         final IOSession currentSession = currentSessionRef.get();
263         if (closeMode == CloseMode.IMMEDIATE) {
264             closed.set(true);
265             currentSession.close(closeMode);
266         } else {
267             if (closed.compareAndSet(false, true)) {
268                 try {
269                     currentSession.close(closeMode);
270                 } finally {
271                     closedSessions.add(this);
272                 }
273             }
274         }
275     }
276 
277     @Override
278     public IOSession.Status getStatus() {
279         final IOSession currentSession = currentSessionRef.get();
280         return currentSession.getStatus();
281     }
282 
283     @Override
284     public boolean isOpen() {
285         final IOSession currentSession = currentSessionRef.get();
286         return currentSession.isOpen();
287     }
288 
289     @Override
290     public void enqueue(final Command command, final Command.Priority priority) {
291         final IOSession currentSession = currentSessionRef.get();
292         currentSession.enqueue(command, priority);
293     }
294 
295     @Override
296     public boolean hasCommands() {
297         final IOSession currentSession = currentSessionRef.get();
298         return currentSession.hasCommands();
299     }
300 
301     @Override
302     public Command poll() {
303         final IOSession currentSession = currentSessionRef.get();
304         return currentSession.poll();
305     }
306 
307     @Override
308     public ByteChannel channel() {
309         final IOSession currentSession = currentSessionRef.get();
310         return currentSession.channel();
311     }
312 
313     @Override
314     public SocketAddress getRemoteAddress() {
315         return ioSession.getRemoteAddress();
316     }
317 
318     @Override
319     public SocketAddress getLocalAddress() {
320         return ioSession.getLocalAddress();
321     }
322 
323     @Override
324     public int getEventMask() {
325         final IOSession currentSession = currentSessionRef.get();
326         return currentSession.getEventMask();
327     }
328 
329     @Override
330     public void setEventMask(final int ops) {
331         final IOSession currentSession = currentSessionRef.get();
332         currentSession.setEventMask(ops);
333     }
334 
335     @Override
336     public void setEvent(final int op) {
337         final IOSession currentSession = currentSessionRef.get();
338         currentSession.setEvent(op);
339     }
340 
341     @Override
342     public void clearEvent(final int op) {
343         final IOSession currentSession = currentSessionRef.get();
344         currentSession.clearEvent(op);
345     }
346 
347     @Override
348     public Timeout getSocketTimeout() {
349         return ioSession.getSocketTimeout();
350     }
351 
352     @Override
353     public void setSocketTimeout(final Timeout timeout) {
354         ioSession.setSocketTimeout(timeout);
355     }
356 
357     @Override
358     public int read(final ByteBuffer dst) throws IOException {
359         final IOSession currentSession = currentSessionRef.get();
360         return currentSession.read(dst);
361     }
362 
363     @Override
364     public int write(final ByteBuffer src) throws IOException {
365         final IOSession currentSession = currentSessionRef.get();
366         return currentSession.write(src);
367     }
368 
369     @Override
370     public void updateReadTime() {
371         ioSession.updateReadTime();
372     }
373 
374     @Override
375     public void updateWriteTime() {
376         ioSession.updateWriteTime();
377     }
378 
379     @Override
380     public long getLastReadTime() {
381         return ioSession.getLastReadTime();
382     }
383 
384     @Override
385     public long getLastWriteTime() {
386         return ioSession.getLastWriteTime();
387     }
388 
389     @Override
390     public long getLastEventTime() {
391         return ioSession.getLastEventTime();
392     }
393 
394     @Override
395     public String toString() {
396         final IOSession currentSession = currentSessionRef.get();
397         if (currentSession != null) {
398             return currentSession.toString();
399         } else {
400             return ioSession.toString();
401         }
402     }
403 
404 }